Commit 82dcf32d authored by Jeremy Hylton's avatar Jeremy Hylton

Merge changes from ZODB3-3_2-branch to Zope-2_7-branch.

Please make all future changes on the Zope-2_7-branch instead.
parent 5360717c
...@@ -367,8 +367,14 @@ class ClientCache: ...@@ -367,8 +367,14 @@ class ClientCache:
data = read(dlen) data = read(dlen)
self._trace(0x2A, oid, version, h[19:], dlen) self._trace(0x2A, oid, version, h[19:], dlen)
if (p < 0) != self._current: if (p < 0) != self._current:
# If the cache read we are copying has version info,
# we need to pass the header to copytocurrent().
if vlen:
vheader = read(vlen + 4)
else:
vheader = None
self._copytocurrent(ap, oidlen, tlen, dlen, vlen, h, self._copytocurrent(ap, oidlen, tlen, dlen, vlen, h,
oid, data) oid, data, vheader)
return data, h[19:] return data, h[19:]
else: else:
self._trace(0x26, oid, version) self._trace(0x26, oid, version)
...@@ -412,12 +418,13 @@ class ClientCache: ...@@ -412,12 +418,13 @@ class ClientCache:
""" """
if self._pos + tlen > self._limit: if self._pos + tlen > self._limit:
return # Don't let this cause a cache flip return # Don't let this cause a cache flip
assert len(header) == 27 assert len(header) == 27, len(header)
if header[8] == 'n': if header[8] == 'n':
# Rewrite the header to drop the version data. # Rewrite the header to drop the version data.
# This shortens the record. # This shortens the record.
tlen = 31 + oidlen + dlen tlen = 31 + oidlen + dlen
vlen = 0 vlen = 0
vheader = None
# (oidlen:2, reserved:6, status:1, tlen:4, # (oidlen:2, reserved:6, status:1, tlen:4,
# vlen:2, dlen:4, serial:8) # vlen:2, dlen:4, serial:8)
header = header[:9] + pack(">IHI", tlen, vlen, dlen) + header[-8:] header = header[:9] + pack(">IHI", tlen, vlen, dlen) + header[-8:]
...@@ -446,7 +453,8 @@ class ClientCache: ...@@ -446,7 +453,8 @@ class ClientCache:
l.append(vdata) l.append(vdata)
l.append(vserial) l.append(vserial)
else: else:
assert None is vheader is vdata is vserial assert None is vheader is vdata is vserial, (
vlen, vheader, vdata, vserial)
l.append(header[9:13]) # copy of tlen l.append(header[9:13]) # copy of tlen
g = self._f[self._current] g = self._f[self._current]
g.seek(self._pos) g.seek(self._pos)
......
...@@ -78,7 +78,7 @@ disconnected_stub = DisconnectedServerStub() ...@@ -78,7 +78,7 @@ disconnected_stub = DisconnectedServerStub()
MB = 1024**2 MB = 1024**2
class ClientStorage: class ClientStorage(object):
"""A Storage class that is a network client to a remote storage. """A Storage class that is a network client to a remote storage.
...@@ -129,6 +129,7 @@ class ClientStorage: ...@@ -129,6 +129,7 @@ class ClientStorage:
client -- A name used to construct persistent cache filenames. client -- A name used to construct persistent cache filenames.
Defaults to None, in which case the cache is not persistent. Defaults to None, in which case the cache is not persistent.
See ClientCache for more info.
debug -- Ignored. This is present only for backwards debug -- Ignored. This is present only for backwards
compatibility with ZEO 1. compatibility with ZEO 1.
......
...@@ -31,6 +31,9 @@ class CommitLog: ...@@ -31,6 +31,9 @@ class CommitLog:
self.stores = 0 self.stores = 0
self.read = 0 self.read = 0
def size(self):
return self.file.tell()
def store(self, oid, serial, data, version): def store(self, oid, serial, data, version):
self.pickler.dump((oid, serial, data, version)) self.pickler.dump((oid, serial, data, version))
self.stores += 1 self.stores += 1
......
...@@ -82,6 +82,7 @@ class ZEOStorage: ...@@ -82,6 +82,7 @@ class ZEOStorage:
self.read_only = read_only self.read_only = read_only
self.locked = 0 self.locked = 0
self.verifying = 0 self.verifying = 0
self.store_failed = 0
self.log_label = _label self.log_label = _label
self.authenticated = 0 self.authenticated = 0
self.auth_realm = auth_realm self.auth_realm = auth_realm
...@@ -367,6 +368,7 @@ class ZEOStorage: ...@@ -367,6 +368,7 @@ class ZEOStorage:
self.txnlog = CommitLog() self.txnlog = CommitLog()
self.tid = tid self.tid = tid
self.status = status self.status = status
self.store_failed = 0
self.stats.active_txns += 1 self.stats.active_txns += 1
def tpc_finish(self, id): def tpc_finish(self, id):
...@@ -401,9 +403,9 @@ class ZEOStorage: ...@@ -401,9 +403,9 @@ class ZEOStorage:
self.timeout.end(self) self.timeout.end(self)
self.stats.lock_time = None self.stats.lock_time = None
self.log("Transaction released storage lock") self.log("Transaction released storage lock")
# _handle_waiting() can start another transaction (by # _handle_waiting() can start another transaction (by
# restarting a waiting one) so must be done last # restarting a waiting one) so must be done last
self._handle_waiting() self._handle_waiting()
def _abort(self): def _abort(self):
# called when a connection is closed unexpectedly # called when a connection is closed unexpectedly
...@@ -471,12 +473,14 @@ class ZEOStorage: ...@@ -471,12 +473,14 @@ class ZEOStorage:
self.storage.tpc_begin(txn, tid, status) self.storage.tpc_begin(txn, tid, status)
def _store(self, oid, serial, data, version): def _store(self, oid, serial, data, version):
err = None
try: try:
newserial = self.storage.store(oid, serial, data, version, newserial = self.storage.store(oid, serial, data, version,
self.transaction) self.transaction)
except (SystemExit, KeyboardInterrupt): except (SystemExit, KeyboardInterrupt):
raise raise
except Exception, err: except Exception, err:
self.store_failed = 1
if isinstance(err, ConflictError): if isinstance(err, ConflictError):
self.stats.conflicts += 1 self.stats.conflicts += 1
if not isinstance(err, TransactionError): if not isinstance(err, TransactionError):
...@@ -503,9 +507,15 @@ class ZEOStorage: ...@@ -503,9 +507,15 @@ class ZEOStorage:
if newserial == ResolvedSerial: if newserial == ResolvedSerial:
self.stats.conflicts_resolved += 1 self.stats.conflicts_resolved += 1
self.serials.append((oid, newserial)) self.serials.append((oid, newserial))
return err is None
def _vote(self): def _vote(self):
self.client.serialnos(self.serials) self.client.serialnos(self.serials)
# If a store call failed, then return to the client immediately.
# The serialnos() call will deliver an exception that will be
# handled by the client in its tpc_vote() method.
if self.store_failed:
return
return self.storage.tpc_vote(self.transaction) return self.storage.tpc_vote(self.transaction)
def _abortVersion(self, src): def _abortVersion(self, src):
...@@ -554,11 +564,18 @@ class ZEOStorage: ...@@ -554,11 +564,18 @@ class ZEOStorage:
def _restart(self, delay=None): def _restart(self, delay=None):
# Restart when the storage lock is available. # Restart when the storage lock is available.
if self.txnlog.stores == 1:
template = "Preparing to commit transaction: %d object, %d bytes"
else:
template = "Preparing to commit transaction: %d objects, %d bytes"
self.log(template % (self.txnlog.stores, self.txnlog.size()),
level=zLOG.BLATHER)
self._tpc_begin(self.transaction, self.tid, self.status) self._tpc_begin(self.transaction, self.tid, self.status)
loads, loader = self.txnlog.get_loader() loads, loader = self.txnlog.get_loader()
for i in range(loads): for i in range(loads):
# load oid, serial, data, version # load oid, serial, data, version
self._store(*loader.load()) if not self._store(*loader.load()):
break
resp = self._thunk() resp = self._thunk()
if delay is not None: if delay is not None:
delay.reply(resp) delay.reply(resp)
...@@ -612,7 +629,7 @@ class StorageServer: ...@@ -612,7 +629,7 @@ class StorageServer:
transaction_timeout=None, transaction_timeout=None,
monitor_address=None, monitor_address=None,
auth_protocol=None, auth_protocol=None,
auth_filename=None, auth_database=None,
auth_realm=None): auth_realm=None):
"""StorageServer constructor. """StorageServer constructor.
...@@ -659,7 +676,7 @@ class StorageServer: ...@@ -659,7 +676,7 @@ class StorageServer:
auth_protocol -- The name of the authentication protocol to use. auth_protocol -- The name of the authentication protocol to use.
Examples are "digest" and "srp". Examples are "digest" and "srp".
auth_filename -- The name of the password database filename. auth_database -- The name of the password database filename.
It should be in a format compatible with the authentication It should be in a format compatible with the authentication
protocol used; for instance, "sha" and "srp" require different protocol used; for instance, "sha" and "srp" require different
formats. formats.
...@@ -685,7 +702,7 @@ class StorageServer: ...@@ -685,7 +702,7 @@ class StorageServer:
s._waiting = [] s._waiting = []
self.read_only = read_only self.read_only = read_only
self.auth_protocol = auth_protocol self.auth_protocol = auth_protocol
self.auth_filename = auth_filename self.auth_database = auth_database
self.auth_realm = auth_realm self.auth_realm = auth_realm
self.database = None self.database = None
if auth_protocol: if auth_protocol:
...@@ -739,7 +756,7 @@ class StorageServer: ...@@ -739,7 +756,7 @@ class StorageServer:
# storages, avoiding the need to bloat each with a new authenticator # storages, avoiding the need to bloat each with a new authenticator
# Database that would contain the same info, and also avoiding any # Database that would contain the same info, and also avoiding any
# possibly synchronization issues between them. # possibly synchronization issues between them.
self.database = db_class(self.auth_filename) self.database = db_class(self.auth_database)
if self.database.realm != self.auth_realm: if self.database.realm != self.auth_realm:
raise ValueError("password database realm %r " raise ValueError("password database realm %r "
"does not match storage realm %r" "does not match storage realm %r"
......
...@@ -82,8 +82,7 @@ class CommitLockTests: ...@@ -82,8 +82,7 @@ class CommitLockTests:
# The commit lock tests verify that the storage successfully # The commit lock tests verify that the storage successfully
# blocks and restarts transactions when there is contention for a # blocks and restarts transactions when there is contention for a
# single storage. There are a lot of cases to cover. transaction # single storage. There are a lot of cases to cover.
# has finished.
# The general flow of these tests is to start a transaction by # The general flow of these tests is to start a transaction by
# getting far enough into 2PC to acquire the commit lock. Then # getting far enough into 2PC to acquire the commit lock. Then
......
...@@ -100,6 +100,8 @@ class CommonSetupTearDown(StorageTestBase): ...@@ -100,6 +100,8 @@ class CommonSetupTearDown(StorageTestBase):
if getattr(self, '_storage', None) is not None: if getattr(self, '_storage', None) is not None:
self._storage.close() self._storage.close()
if hasattr(self._storage, 'cleanup'): if hasattr(self._storage, 'cleanup'):
zLOG.LOG("testZEO", zLOG.DEBUG, "cleanup storage %s" %
self._storage.__name__)
self._storage.cleanup() self._storage.cleanup()
for adminaddr in self._servers: for adminaddr in self._servers:
if adminaddr is not None: if adminaddr is not None:
...@@ -141,9 +143,14 @@ class CommonSetupTearDown(StorageTestBase): ...@@ -141,9 +143,14 @@ class CommonSetupTearDown(StorageTestBase):
def getConfig(self, path, create, read_only): def getConfig(self, path, create, read_only):
raise NotImplementedError raise NotImplementedError
def openClientStorage(self, cache='', cache_size=200000, wait=1, cache_id = 1
def openClientStorage(self, cache=None, cache_size=200000, wait=1,
read_only=0, read_only_fallback=0, read_only=0, read_only_fallback=0,
username=None, password=None, realm=None): username=None, password=None, realm=None):
if cache is None:
cache = str(self.__class__.cache_id)
self.__class__.cache_id += 1
self.caches.append(cache) self.caches.append(cache)
storage = TestClientStorage(self.addr, storage = TestClientStorage(self.addr,
client=cache, client=cache,
...@@ -566,6 +573,70 @@ class ConnectionTests(CommonSetupTearDown): ...@@ -566,6 +573,70 @@ class ConnectionTests(CommonSetupTearDown):
db2.close() db2.close()
db1.close() db1.close()
class InvqTests(CommonSetupTearDown):
invq = 2
def checkQuickVerificationWith2Clients(self):
perstorage = self.openClientStorage(cache="test")
self.assertEqual(perstorage.verify_result, "full verification")
self._storage = self.openClientStorage()
oid = self._storage.new_oid()
# When we create a new storage, it should always do a full
# verification
self.assertEqual(self._storage.verify_result, "full verification")
# do two storages of the object to make sure an invalidation
# message is generated
revid = self._dostore(oid)
revid = self._dostore(oid, revid)
perstorage.load(oid, '')
perstorage.close()
revid = self._dostore(oid, revid)
perstorage = self.openClientStorage(cache="test")
self.assertEqual(perstorage.verify_result, "quick verification")
self.assertEqual(perstorage.load(oid, ''),
self._storage.load(oid, ''))
perstorage.close()
def checkVerificationWith2ClientsInvqOverflow(self):
perstorage = self.openClientStorage(cache="test")
self.assertEqual(perstorage.verify_result, "full verification")
self._storage = self.openClientStorage()
oid = self._storage.new_oid()
# When we create a new storage, it should always do a full
# verification
self.assertEqual(self._storage.verify_result, "full verification")
# do two storages of the object to make sure an invalidation
# message is generated
revid = self._dostore(oid)
revid = self._dostore(oid, revid)
perstorage.load(oid, '')
perstorage.close()
# the test code sets invq bound to 2
for i in range(5):
revid = self._dostore(oid, revid)
perstorage = self.openClientStorage(cache="test")
self.assertEqual(perstorage.verify_result, "full verification")
t = time.time() + 30
while not perstorage.end_verify.isSet():
perstorage.sync()
if time.time() > t:
self.fail("timed out waiting for endVerify")
self.assertEqual(self._storage.load(oid, '')[1], revid)
self.assertEqual(perstorage.load(oid, ''),
self._storage.load(oid, ''))
perstorage.close()
class ReconnectionTests(CommonSetupTearDown): class ReconnectionTests(CommonSetupTearDown):
# The setUp() starts a server automatically. In order for its # The setUp() starts a server automatically. In order for its
# state to persist, we set the class variable keep to 1. In # state to persist, we set the class variable keep to 1. In
...@@ -688,7 +759,7 @@ class ReconnectionTests(CommonSetupTearDown): ...@@ -688,7 +759,7 @@ class ReconnectionTests(CommonSetupTearDown):
self._newAddr() self._newAddr()
# Start a read-only server # Start a read-only server
self.startServer(create=0, index=0, read_only=1) self.startServer(create=0, index=0, read_only=1, keep=0)
# Start a client in fallback mode # Start a client in fallback mode
self._storage = self.openClientStorage(read_only_fallback=1) self._storage = self.openClientStorage(read_only_fallback=1)
# Stores should fail here # Stores should fail here
...@@ -756,69 +827,6 @@ class ReconnectionTests(CommonSetupTearDown): ...@@ -756,69 +827,6 @@ class ReconnectionTests(CommonSetupTearDown):
perstorage.close() perstorage.close()
self._storage.close() self._storage.close()
def checkQuickVerificationWith2Clients(self):
perstorage = self.openClientStorage(cache="test")
self.assertEqual(perstorage.verify_result, "full verification")
self._storage = self.openClientStorage()
oid = self._storage.new_oid()
# When we create a new storage, it should always do a full
# verification
self.assertEqual(self._storage.verify_result, "full verification")
# do two storages of the object to make sure an invalidation
# message is generated
revid = self._dostore(oid)
revid = self._dostore(oid, revid)
perstorage.load(oid, '')
perstorage.close()
revid = self._dostore(oid, revid)
perstorage = self.openClientStorage(cache="test")
self.assertEqual(perstorage.verify_result, "quick verification")
self.assertEqual(perstorage.load(oid, ''),
self._storage.load(oid, ''))
perstorage.close()
def checkVerificationWith2ClientsInvqOverflow(self):
perstorage = self.openClientStorage(cache="test")
self.assertEqual(perstorage.verify_result, "full verification")
self._storage = self.openClientStorage()
oid = self._storage.new_oid()
# When we create a new storage, it should always do a full
# verification
self.assertEqual(self._storage.verify_result, "full verification")
# do two storages of the object to make sure an invalidation
# message is generated
revid = self._dostore(oid)
revid = self._dostore(oid, revid)
perstorage.load(oid, '')
perstorage.close()
# the test code sets invq bound to 2
for i in range(5):
revid = self._dostore(oid, revid)
perstorage = self.openClientStorage(cache="test")
self.assertEqual(perstorage.verify_result, "full verification")
t = time.time() + 30
while not perstorage.end_verify.isSet():
perstorage.sync()
if time.time() > t:
self.fail("timed out waiting for endVerify")
self.assertEqual(self._storage.load(oid, '')[1], revid)
self.assertEqual(perstorage.load(oid, ''),
self._storage.load(oid, ''))
perstorage.close()
class TimeoutTests(CommonSetupTearDown): class TimeoutTests(CommonSetupTearDown):
timeout = 1 timeout = 1
......
...@@ -12,7 +12,6 @@ ...@@ -12,7 +12,6 @@
# #
############################################################################## ##############################################################################
from thread import get_ident
import threading import threading
import time import time
...@@ -20,12 +19,10 @@ from BTrees.check import check, display ...@@ -20,12 +19,10 @@ from BTrees.check import check, display
from BTrees.OOBTree import OOBTree from BTrees.OOBTree import OOBTree
from ZEO.tests.TestThread import TestThread from ZEO.tests.TestThread import TestThread
from ZEO.tests.ConnectionTests import CommonSetupTearDown
from ZODB.DB import DB from ZODB.DB import DB
from ZODB.POSException \ from ZODB.POSException \
import ReadConflictError, ConflictError, VersionLockError import ReadConflictError, ConflictError, VersionLockError
import zLOG
# The tests here let several threads have a go at one or more database # The tests here let several threads have a go at one or more database
# instances simultaneously. Each thread appends a disjoint (from the # instances simultaneously. Each thread appends a disjoint (from the
...@@ -48,8 +45,8 @@ class StressThread(TestThread): ...@@ -48,8 +45,8 @@ class StressThread(TestThread):
# to 'tree' until Event stop is set. If sleep is given, sleep # to 'tree' until Event stop is set. If sleep is given, sleep
# that long after each append. At the end, instance var .added_keys # that long after each append. At the end, instance var .added_keys
# is a list of the ints the thread believes it added successfully. # is a list of the ints the thread believes it added successfully.
def __init__(self, testcase, db, stop, threadnum, startnum, def __init__(self, testcase, db, stop, threadnum, commitdict,
step=2, sleep=None): startnum, step=2, sleep=None):
TestThread.__init__(self, testcase) TestThread.__init__(self, testcase)
self.db = db self.db = db
self.stop = stop self.stop = stop
...@@ -58,6 +55,7 @@ class StressThread(TestThread): ...@@ -58,6 +55,7 @@ class StressThread(TestThread):
self.step = step self.step = step
self.sleep = sleep self.sleep = sleep
self.added_keys = [] self.added_keys = []
self.commitdict = commitdict
def testrun(self): def testrun(self):
cn = self.db.open() cn = self.db.open()
...@@ -74,6 +72,7 @@ class StressThread(TestThread): ...@@ -74,6 +72,7 @@ class StressThread(TestThread):
tree[key] = self.threadnum tree[key] = self.threadnum
get_transaction().note("add key %s" % key) get_transaction().note("add key %s" % key)
get_transaction().commit() get_transaction().commit()
self.commitdict[self] = 1
if self.sleep: if self.sleep:
time.sleep(self.sleep) time.sleep(self.sleep)
except (ReadConflictError, ConflictError), msg: except (ReadConflictError, ConflictError), msg:
...@@ -88,9 +87,13 @@ class StressThread(TestThread): ...@@ -88,9 +87,13 @@ class StressThread(TestThread):
key += self.step key += self.step
cn.close() cn.close()
class VersionStressThread(TestThread): class LargeUpdatesThread(TestThread):
# A thread that performs a lot of updates. It attempts to modify
# more than 25 objects so that it can test code that runs vote
# in a separate thread when it modifies more than 25 objects.
def __init__(self, testcase, db, stop, threadnum, startnum, def __init__(self, testcase, db, stop, threadnum, commitdict, startnum,
step=2, sleep=None): step=2, sleep=None):
TestThread.__init__(self, testcase) TestThread.__init__(self, testcase)
self.db = db self.db = db
...@@ -100,21 +103,88 @@ class VersionStressThread(TestThread): ...@@ -100,21 +103,88 @@ class VersionStressThread(TestThread):
self.step = step self.step = step
self.sleep = sleep self.sleep = sleep
self.added_keys = [] self.added_keys = []
self.commitdict = commitdict
def testrun(self):
cn = self.db.open()
while not self.stop.isSet():
try:
tree = cn.root()["tree"]
break
except (ConflictError, KeyError):
# print "%d getting tree abort" % self.threadnum
get_transaction().abort()
cn.sync()
keys_added = {} # set of keys we commit
tkeys = []
while not self.stop.isSet():
# The test picks 50 keys spread across many buckets.
# self.startnum and self.step ensure that all threads use
# disjoint key sets, to minimize conflict errors.
nkeys = len(tkeys)
if nkeys < 50:
tkeys = range(self.startnum, 3000, self.step)
nkeys = len(tkeys)
step = max(int(nkeys / 50), 1)
keys = [tkeys[i] for i in range(0, nkeys, step)]
for key in keys:
try:
tree[key] = self.threadnum
except (ReadConflictError, ConflictError), msg:
# print "%d setting key %s" % (self.threadnum, msg)
get_transaction().abort()
cn.sync()
break
else:
# print "%d set #%d" % (self.threadnum, len(keys))
get_transaction().note("keys %s" % ", ".join(map(str, keys)))
try:
get_transaction().commit()
self.commitdict[self] = 1
if self.sleep:
time.sleep(self.sleep)
except ConflictError, msg:
# print "%d commit %s" % (self.threadnum, msg)
get_transaction().abort()
cn.sync()
continue
for k in keys:
tkeys.remove(k)
keys_added[k] = 1
# sync() is necessary here to process invalidations
# if we get a read conflict. In the read conflict case,
# no objects were modified so cn never got registered
# with the transaction.
cn.sync()
self.added_keys = keys_added.keys()
cn.close()
class VersionStressThread(TestThread):
def log(self, msg): def __init__(self, testcase, db, stop, threadnum, commitdict, startnum,
zLOG.LOG("thread %d" % get_ident(), 0, msg) step=2, sleep=None):
TestThread.__init__(self, testcase)
self.db = db
self.stop = stop
self.threadnum = threadnum
self.startnum = startnum
self.step = step
self.sleep = sleep
self.added_keys = []
self.commitdict = commitdict
def testrun(self): def testrun(self):
self.log("thread begin")
commit = 0 commit = 0
key = self.startnum key = self.startnum
while not self.stop.isSet(): while not self.stop.isSet():
version = "%s:%s" % (self.threadnum, key) version = "%s:%s" % (self.threadnum, key)
commit = not commit commit = not commit
self.log("attempt to add key=%s version=%s commit=%d" %
(key, version, commit))
if self.oneupdate(version, key, commit): if self.oneupdate(version, key, commit):
self.added_keys.append(key) self.added_keys.append(key)
self.commitdict[self] = 1
key += self.step key += self.step
def oneupdate(self, version, key, commit=1): def oneupdate(self, version, key, commit=1):
...@@ -134,13 +204,11 @@ class VersionStressThread(TestThread): ...@@ -134,13 +204,11 @@ class VersionStressThread(TestThread):
while not self.stop.isSet(): while not self.stop.isSet():
try: try:
tree[key] = self.threadnum tree[key] = self.threadnum
get_transaction().note("add key %d" % key)
get_transaction().commit() get_transaction().commit()
if self.sleep: if self.sleep:
time.sleep(self.sleep) time.sleep(self.sleep)
break break
except (VersionLockError, ReadConflictError, ConflictError), msg: except (VersionLockError, ReadConflictError, ConflictError), msg:
self.log(msg)
get_transaction().abort() get_transaction().abort()
# sync() is necessary here to process invalidations # sync() is necessary here to process invalidations
# if we get a read conflict. In the read conflict case, # if we get a read conflict. In the read conflict case,
...@@ -163,20 +231,30 @@ class VersionStressThread(TestThread): ...@@ -163,20 +231,30 @@ class VersionStressThread(TestThread):
time.sleep(self.sleep) time.sleep(self.sleep)
return commit return commit
except ConflictError, msg: except ConflictError, msg:
self.log(msg)
get_transaction().abort() get_transaction().abort()
cn.sync() cn.sync()
finally: finally:
cn.close() cn.close()
return 0 return 0
class InvalidationTests(CommonSetupTearDown): class InvalidationTests:
level = 2 level = 2
DELAY = 15 # number of seconds the main thread lets the workers run
# Minimum # of seconds the main thread lets the workers run. The
# test stops as soon as this much time has elapsed, and all threads
# have managed to commit a change.
MINTIME = 10
# Maximum # of seconds the main thread lets the workers run. We
# stop after this long has elapsed regardless of whether all threads
# have managed to commit a change.
MAXTIME = 300
StressThread = StressThread
def _check_tree(self, cn, tree): def _check_tree(self, cn, tree):
# Make sure the BTree is sane and that all the updates persisted. # Make sure the BTree is sane at the C level.
retries = 3 retries = 3
while retries: while retries:
retries -= 1 retries -= 1
...@@ -196,28 +274,46 @@ class InvalidationTests(CommonSetupTearDown): ...@@ -196,28 +274,46 @@ class InvalidationTests(CommonSetupTearDown):
def _check_threads(self, tree, *threads): def _check_threads(self, tree, *threads):
# Make sure the thread's view of the world is consistent with # Make sure the thread's view of the world is consistent with
# the actual database state. # the actual database state.
all_keys = [] expected_keys = []
errormsgs = []
err = errormsgs.append
for t in threads: for t in threads:
# If the test didn't add any keys, it didn't do what we expected. if not t.added_keys:
self.assert_(t.added_keys) err("thread %d didn't add any keys" % t.threadnum)
for key in t.added_keys: expected_keys.extend(t.added_keys)
self.assert_(tree.has_key(key), key) expected_keys.sort()
all_keys.extend(t.added_keys) actual_keys = list(tree.keys())
all_keys.sort() if expected_keys != actual_keys:
self.assertEqual(all_keys, list(tree.keys())) err("expected keys != actual keys")
for k in expected_keys:
def go(self, stop, *threads): if k not in actual_keys:
err("key %s expected but not in tree" % k)
for k in actual_keys:
if k not in expected_keys:
err("key %s in tree but not expected" % k)
if errormsgs:
display(tree)
self.fail('\n'.join(errormsgs))
def go(self, stop, commitdict, *threads):
# Run the threads # Run the threads
for t in threads: for t in threads:
t.start() t.start()
time.sleep(self.DELAY) delay = self.MINTIME
start = time.time()
while time.time() - start <= self.MAXTIME:
time.sleep(delay)
delay = 2.0
if len(commitdict) >= len(threads):
break
# Some thread still hasn't managed to commit anything.
stop.set() stop.set()
for t in threads: for t in threads:
t.cleanup() t.cleanup()
def checkConcurrentUpdates2Storages(self): def checkConcurrentUpdates2Storages(self):
self._storage = storage1 = self.openClientStorage() self._storage = storage1 = self.openClientStorage()
storage2 = self.openClientStorage(cache="2") storage2 = self.openClientStorage()
db1 = DB(storage1) db1 = DB(storage1)
db2 = DB(storage2) db2 = DB(storage2)
stop = threading.Event() stop = threading.Event()
...@@ -227,9 +323,10 @@ class InvalidationTests(CommonSetupTearDown): ...@@ -227,9 +323,10 @@ class InvalidationTests(CommonSetupTearDown):
get_transaction().commit() get_transaction().commit()
# Run two threads that update the BTree # Run two threads that update the BTree
t1 = StressThread(self, db1, stop, 1, 1) cd = {}
t2 = StressThread(self, db2, stop, 2, 2) t1 = self.StressThread(self, db1, stop, 1, cd, 1)
self.go(stop, t1, t2) t2 = self.StressThread(self, db2, stop, 2, cd, 2)
self.go(stop, cd, t1, t2)
cn.sync() cn.sync()
self._check_tree(cn, tree) self._check_tree(cn, tree)
...@@ -249,9 +346,10 @@ class InvalidationTests(CommonSetupTearDown): ...@@ -249,9 +346,10 @@ class InvalidationTests(CommonSetupTearDown):
get_transaction().commit() get_transaction().commit()
# Run two threads that update the BTree # Run two threads that update the BTree
t1 = StressThread(self, db1, stop, 1, 1, sleep=0.001) cd = {}
t2 = StressThread(self, db1, stop, 2, 2, sleep=0.001) t1 = self.StressThread(self, db1, stop, 1, cd, 1, sleep=0.001)
self.go(stop, t1, t2) t2 = self.StressThread(self, db1, stop, 2, cd, 2, sleep=0.001)
self.go(stop, cd, t1, t2)
cn.sync() cn.sync()
self._check_tree(cn, tree) self._check_tree(cn, tree)
...@@ -263,22 +361,23 @@ class InvalidationTests(CommonSetupTearDown): ...@@ -263,22 +361,23 @@ class InvalidationTests(CommonSetupTearDown):
def checkConcurrentUpdates2StoragesMT(self): def checkConcurrentUpdates2StoragesMT(self):
self._storage = storage1 = self.openClientStorage() self._storage = storage1 = self.openClientStorage()
db1 = DB(storage1) db1 = DB(storage1)
db2 = DB(self.openClientStorage())
stop = threading.Event() stop = threading.Event()
cn = db1.open() cn = db1.open()
tree = cn.root()["tree"] = OOBTree() tree = cn.root()["tree"] = OOBTree()
get_transaction().commit() get_transaction().commit()
db2 = DB(self.openClientStorage(cache="2"))
# Run three threads that update the BTree. # Run three threads that update the BTree.
# Two of the threads share a single storage so that it # Two of the threads share a single storage so that it
# is possible for both threads to read the same object # is possible for both threads to read the same object
# at the same time. # at the same time.
t1 = StressThread(self, db1, stop, 1, 1, 3) cd = {}
t2 = StressThread(self, db2, stop, 2, 2, 3, 0.001) t1 = self.StressThread(self, db1, stop, 1, cd, 1, 3)
t3 = StressThread(self, db2, stop, 3, 3, 3, 0.001) t2 = self.StressThread(self, db2, stop, 2, cd, 2, 3, 0.001)
self.go(stop, t1, t2, t3) t3 = self.StressThread(self, db2, stop, 3, cd, 3, 3, 0.001)
self.go(stop, cd, t1, t2, t3)
cn.sync() cn.sync()
self._check_tree(cn, tree) self._check_tree(cn, tree)
...@@ -291,7 +390,7 @@ class InvalidationTests(CommonSetupTearDown): ...@@ -291,7 +390,7 @@ class InvalidationTests(CommonSetupTearDown):
def checkConcurrentUpdatesInVersions(self): def checkConcurrentUpdatesInVersions(self):
self._storage = storage1 = self.openClientStorage() self._storage = storage1 = self.openClientStorage()
db1 = DB(storage1) db1 = DB(storage1)
db2 = DB(self.openClientStorage(cache="2")) db2 = DB(self.openClientStorage())
stop = threading.Event() stop = threading.Event()
cn = db1.open() cn = db1.open()
...@@ -303,10 +402,11 @@ class InvalidationTests(CommonSetupTearDown): ...@@ -303,10 +402,11 @@ class InvalidationTests(CommonSetupTearDown):
# is possible for both threads to read the same object # is possible for both threads to read the same object
# at the same time. # at the same time.
t1 = VersionStressThread(self, db1, stop, 1, 1, 3) cd = {}
t2 = VersionStressThread(self, db2, stop, 2, 2, 3, 0.001) t1 = VersionStressThread(self, db1, stop, 1, cd, 1, 3)
t3 = VersionStressThread(self, db2, stop, 3, 3, 3, 0.001) t2 = VersionStressThread(self, db2, stop, 2, cd, 2, 3, 0.001)
self.go(stop, t1, t2, t3) t3 = VersionStressThread(self, db2, stop, 3, cd, 3, 3, 0.001)
self.go(stop, cd, t1, t2, t3)
cn.sync() cn.sync()
self._check_tree(cn, tree) self._check_tree(cn, tree)
...@@ -316,3 +416,41 @@ class InvalidationTests(CommonSetupTearDown): ...@@ -316,3 +416,41 @@ class InvalidationTests(CommonSetupTearDown):
db1.close() db1.close()
db2.close() db2.close()
def checkConcurrentLargeUpdates(self):
# Use 3 threads like the 2StorageMT test above.
self._storage = storage1 = self.openClientStorage()
db1 = DB(storage1)
db2 = DB(self.openClientStorage())
stop = threading.Event()
cn = db1.open()
tree = cn.root()["tree"] = OOBTree()
for i in range(0, 3000, 2):
tree[i] = 0
get_transaction().commit()
# Run three threads that update the BTree.
# Two of the threads share a single storage so that it
# is possible for both threads to read the same object
# at the same time.
cd = {}
t1 = LargeUpdatesThread(self, db1, stop, 1, cd, 1, 3, 0.001)
t2 = LargeUpdatesThread(self, db2, stop, 2, cd, 2, 3, 0.001)
t3 = LargeUpdatesThread(self, db2, stop, 3, cd, 3, 3, 0.001)
self.go(stop, cd, t1, t2, t3)
cn.sync()
self._check_tree(cn, tree)
# Purge the tree of the dummy entries mapping to 0.
losers = [k for k, v in tree.items() if v == 0]
for k in losers:
del tree[k]
get_transaction().commit()
self._check_threads(tree, t1, t2, t3)
cn.close()
db1.close()
db2.close()
...@@ -32,14 +32,12 @@ class ClientCacheTests(unittest.TestCase): ...@@ -32,14 +32,12 @@ class ClientCacheTests(unittest.TestCase):
_oid3 = 'cdefghij' _oid3 = 'cdefghij'
def setUp(self): def setUp(self):
unittest.TestCase.setUp(self)
self.cachesize = 10*1000*1000 self.cachesize = 10*1000*1000
self.cache = ClientCache(size=self.cachesize) self.cache = ClientCache(size=self.cachesize)
self.cache.open() self.cache.open()
def tearDown(self): def tearDown(self):
self.cache.close() self.cache.close()
unittest.TestCase.tearDown(self)
def testOpenClose(self): def testOpenClose(self):
pass # All the work is done by setUp() / tearDown() pass # All the work is done by setUp() / tearDown()
...@@ -281,9 +279,10 @@ class ClientCacheTests(unittest.TestCase): ...@@ -281,9 +279,10 @@ class ClientCacheTests(unittest.TestCase):
class PersistentClientCacheTests(unittest.TestCase): class PersistentClientCacheTests(unittest.TestCase):
_oid = 'abcdefgh' _oid = 'abcdefgh'
_oid2 = 'bcdefghi'
_oid3 = 'cdefghij'
def setUp(self): def setUp(self):
unittest.TestCase.setUp(self)
self.vardir = os.getcwd() # Don't use /tmp, it's a security risk self.vardir = os.getcwd() # Don't use /tmp, it's a security risk
self.cachesize = 10*1000*1000 self.cachesize = 10*1000*1000
self.storagename = 'foo' self.storagename = 'foo'
...@@ -319,7 +318,6 @@ class PersistentClientCacheTests(unittest.TestCase): ...@@ -319,7 +318,6 @@ class PersistentClientCacheTests(unittest.TestCase):
os.unlink(filename) os.unlink(filename)
except os.error: except os.error:
pass pass
unittest.TestCase.tearDown(self)
def testCacheFileSelection(self): def testCacheFileSelection(self):
# A bug in __init__ read the wrong slice of the file to determine # A bug in __init__ read the wrong slice of the file to determine
...@@ -388,7 +386,42 @@ class PersistentClientCacheTests(unittest.TestCase): ...@@ -388,7 +386,42 @@ class PersistentClientCacheTests(unittest.TestCase):
cache.checkSize(10*self.cachesize) # Force a file flip cache.checkSize(10*self.cachesize) # Force a file flip
self.failUnless(cache.getLastTid() is None) self.failUnless(cache.getLastTid() is None)
def testLoadNonversionWithVersionInFlippedCache(self):
# This test provokes an error seen once in an unrelated test.
# The object is stored in the old cache file with version data,
# a load for non-version data occurs. The attempt to copy the
# non-version data to the new file fails.
nvdata = "Mend your speech a little, lest it may mar your fortunes."
nvserial = "12345678"
version = "folio"
vdata = "Mend your speech a little, lest you may mar your fortunes."
vserial = "12346789"
self.cache.store(self._oid, nvdata, nvserial, version, vdata, vserial)
self.cache.checkSize(10 * self.cachesize) # force a cache flip
for i in 1, 2: # check the we can load before and after copying
for xversion, xdata, xserial in [("", nvdata, nvserial),
(version, vdata, vserial)]:
data, serial = self.cache.load(self._oid, xversion)
self.assertEqual(data, xdata)
self.assertEqual(serial, xserial)
# now cause two more cache flips and make sure the data is still there
self.cache.store(self._oid2, "", "", "foo", "bar", "23456789")
self.cache.checkSize(10 * self.cachesize) # force a cache flip
self.cache.load(self._oid, "")
self.cache.store(self._oid3, "bar", "34567890", "", "", "")
self.cache.checkSize(10 * self.cachesize) # force a cache flip
self.cache.load(self._oid, "")
for i in 1, 2: # check the we can load before and after copying
for xversion, xdata, xserial in [("", nvdata, nvserial),
(version, vdata, vserial)]:
data, serial = self.cache.load(self._oid, xversion)
self.assertEqual(data, xdata)
self.assertEqual(serial, xserial)
class ClientCacheLongOIDTests(ClientCacheTests): class ClientCacheLongOIDTests(ClientCacheTests):
_oid = 'abcdefghijklmnop' * 2 _oid = 'abcdefghijklmnop' * 2
_oid2 = 'bcdefghijklmnopq' * 2 _oid2 = 'bcdefghijklmnopq' * 2
...@@ -397,7 +430,8 @@ class ClientCacheLongOIDTests(ClientCacheTests): ...@@ -397,7 +430,8 @@ class ClientCacheLongOIDTests(ClientCacheTests):
class PersistentClientCacheLongOIDTests(PersistentClientCacheTests): class PersistentClientCacheLongOIDTests(PersistentClientCacheTests):
_oid = 'abcdefghijklmnop' * 2 _oid = 'abcdefghijklmnop' * 2
_oid2 = 'bcdefghijklmnopq' * 2
_oid3 = 'cdefghijklmnopqr' * 2
def test_suite(): def test_suite():
suite = unittest.TestSuite() suite = unittest.TestSuite()
......
...@@ -38,7 +38,7 @@ class BerkeleyStorageConfig: ...@@ -38,7 +38,7 @@ class BerkeleyStorageConfig:
def getConfig(self, path, create, read_only): def getConfig(self, path, create, read_only):
return """\ return """\
<fullstorage 1> <fullstorage 1>
name %s envdir %s
read-only %s read-only %s
</fullstorage>""" % (path, read_only and "yes" or "no") </fullstorage>""" % (path, read_only and "yes" or "no")
...@@ -57,19 +57,25 @@ class FileStorageConnectionTests( ...@@ -57,19 +57,25 @@ class FileStorageConnectionTests(
class FileStorageReconnectionTests( class FileStorageReconnectionTests(
FileStorageConfig, FileStorageConfig,
ConnectionTests.ReconnectionTests ConnectionTests.ReconnectionTests,
): ):
"""FileStorage-specific re-connection tests.""" """FileStorage-specific re-connection tests."""
# Run this at level 1 because MappingStorage can't do reconnection tests # Run this at level 1 because MappingStorage can't do reconnection tests
level = 1 level = 1
class FileStorageInvqTests(
FileStorageConfig,
ConnectionTests.InvqTests
):
"""FileStorage-specific invalidation queue tests."""
level = 1
class FileStorageTimeoutTests( class FileStorageTimeoutTests(
FileStorageConfig, FileStorageConfig,
ConnectionTests.TimeoutTests ConnectionTests.TimeoutTests
): ):
level = 2 level = 2
class BDBConnectionTests( class BDBConnectionTests(
BerkeleyStorageConfig, BerkeleyStorageConfig,
ConnectionTests.ConnectionTests, ConnectionTests.ConnectionTests,
...@@ -85,6 +91,13 @@ class BDBReconnectionTests( ...@@ -85,6 +91,13 @@ class BDBReconnectionTests(
"""Berkeley storage re-connection tests.""" """Berkeley storage re-connection tests."""
level = 2 level = 2
class BDBInvqTests(
BerkeleyStorageConfig,
ConnectionTests.InvqTests
):
"""Berkeley storage invalidation queue tests."""
level = 2
class BDBTimeoutTests( class BDBTimeoutTests(
BerkeleyStorageConfig, BerkeleyStorageConfig,
ConnectionTests.TimeoutTests ConnectionTests.TimeoutTests
...@@ -112,22 +125,19 @@ class MappingStorageTimeoutTests( ...@@ -112,22 +125,19 @@ class MappingStorageTimeoutTests(
test_classes = [FileStorageConnectionTests, test_classes = [FileStorageConnectionTests,
FileStorageReconnectionTests, FileStorageReconnectionTests,
FileStorageInvqTests,
FileStorageTimeoutTests, FileStorageTimeoutTests,
MappingStorageConnectionTests, MappingStorageConnectionTests,
MappingStorageTimeoutTests] MappingStorageTimeoutTests]
import BDBStorage import BDBStorage
if BDBStorage.is_available: if BDBStorage.is_available:
test_classes.append(BDBConnectionTests) test_classes += [BDBConnectionTests,
test_classes.append(BDBReconnectionTests) BDBReconnectionTests,
test_classes.append(BDBTimeoutTests) BDBInvqTests,
BDBTimeoutTests]
def test_suite(): def test_suite():
# shutup warnings about mktemp
import warnings
warnings.filterwarnings("ignore", "mktemp")
suite = unittest.TestSuite() suite = unittest.TestSuite()
for klass in test_classes: for klass in test_classes:
sub = unittest.makeSuite(klass, 'check') sub = unittest.makeSuite(klass, 'check')
......
...@@ -187,7 +187,7 @@ class BDBTests(FileStorageTests): ...@@ -187,7 +187,7 @@ class BDBTests(FileStorageTests):
self._envdir = tempfile.mktemp() self._envdir = tempfile.mktemp()
return """\ return """\
<fullstorage 1> <fullstorage 1>
name %s envdir %s
</fullstorage> </fullstorage>
""" % self._envdir """ % self._envdir
......
...@@ -122,9 +122,9 @@ class Suicide(threading.Thread): ...@@ -122,9 +122,9 @@ class Suicide(threading.Thread):
self._adminaddr = addr self._adminaddr = addr
def run(self): def run(self):
# If this process doesn't exit in 100 seconds, commit suicide # If this process doesn't exit in 300 seconds, commit suicide
for i in range(20): time.sleep(300)
time.sleep(5) log("zeoserver", "suicide thread invoking shutdown")
from ZEO.tests.forker import shutdown_zeo_server from ZEO.tests.forker import shutdown_zeo_server
# XXX If the -k option was given to zeoserver, then the process will # XXX If the -k option was given to zeoserver, then the process will
# go away but the temp files won't get cleaned up. # go away but the temp files won't get cleaned up.
...@@ -174,7 +174,7 @@ def main(): ...@@ -174,7 +174,7 @@ def main():
transaction_timeout=zo.transaction_timeout, transaction_timeout=zo.transaction_timeout,
monitor_address=mon_addr, monitor_address=mon_addr,
auth_protocol=zo.auth_protocol, auth_protocol=zo.auth_protocol,
auth_filename=zo.auth_database, auth_database=zo.auth_database,
auth_realm=zo.auth_realm) auth_realm=zo.auth_realm)
try: try:
......
...@@ -28,7 +28,7 @@ from ZEO.zrpc.log import log ...@@ -28,7 +28,7 @@ from ZEO.zrpc.log import log
from ZEO.zrpc.trigger import trigger from ZEO.zrpc.trigger import trigger
from ZEO.zrpc.connection import ManagedConnection from ZEO.zrpc.connection import ManagedConnection
class ConnectionManager: class ConnectionManager(object):
"""Keeps a connection up over time""" """Keeps a connection up over time"""
def __init__(self, addrs, client, tmin=1, tmax=180): def __init__(self, addrs, client, tmin=1, tmax=180):
......
...@@ -67,7 +67,7 @@ class MTDelay(Delay): ...@@ -67,7 +67,7 @@ class MTDelay(Delay):
self.ready.wait() self.ready.wait()
Delay.error(self, exc_info) Delay.error(self, exc_info)
class Connection(smac.SizedMessageAsyncConnection): class Connection(smac.SizedMessageAsyncConnection, object):
"""Dispatcher for RPC on object on both sides of socket. """Dispatcher for RPC on object on both sides of socket.
The connection supports synchronous calls, which expect a return, The connection supports synchronous calls, which expect a return,
......
...@@ -13,24 +13,29 @@ ...@@ -13,24 +13,29 @@
############################################################################## ##############################################################################
"""Handy standard storage machinery """Handy standard storage machinery
$Id: BaseStorage.py,v 1.34 2003/06/10 15:46:31 shane Exp $ $Id: BaseStorage.py,v 1.35 2003/09/15 16:29:15 jeremy Exp $
""" """
import cPickle import cPickle
import ThreadLock, bpthread import time
import time, UndoLogCompatible
import POSException
from TimeStamp import TimeStamp
z64='\0'*8
class BaseStorage(UndoLogCompatible.UndoLogCompatible): import ThreadLock
import zLOG
from ZODB import bpthread
from ZODB import POSException
from ZODB.TimeStamp import TimeStamp
from ZODB.UndoLogCompatible import UndoLogCompatible
from ZODB.utils import z64
class BaseStorage(UndoLogCompatible):
_transaction=None # Transaction that is being committed _transaction=None # Transaction that is being committed
_serial=z64 # Transaction serial number _serial=z64 # Transaction serial number
_tstatus=' ' # Transaction status, used for copying data _tstatus=' ' # Transaction status, used for copying data
_is_read_only = 0 _is_read_only = 0
def __init__(self, name, base=None): def __init__(self, name, base=None):
self.__name__= name
self.__name__=name zLOG.LOG(self.__class__.__name__, zLOG.DEBUG,
"create storage %s" % self.__name__)
# Allocate locks: # Allocate locks:
l=ThreadLock.allocate_lock() l=ThreadLock.allocate_lock()
......
...@@ -13,7 +13,7 @@ ...@@ -13,7 +13,7 @@
############################################################################## ##############################################################################
"""Database connection support """Database connection support
$Id: Connection.py,v 1.98 2003/06/13 21:53:08 jeremy Exp $""" $Id: Connection.py,v 1.99 2003/09/15 16:29:15 jeremy Exp $"""
from __future__ import nested_scopes from __future__ import nested_scopes
...@@ -47,7 +47,7 @@ def updateCodeTimestamp(): ...@@ -47,7 +47,7 @@ def updateCodeTimestamp():
ExtensionKlass = Base.__class__ ExtensionKlass = Base.__class__
class Connection(ExportImport.ExportImport): class Connection(ExportImport.ExportImport, object):
"""Object managers for individual object space. """Object managers for individual object space.
An object space is a version of collection of objects. In a An object space is a version of collection of objects. In a
...@@ -136,11 +136,10 @@ class Connection(ExportImport.ExportImport): ...@@ -136,11 +136,10 @@ class Connection(ExportImport.ExportImport):
# Explicitly remove references from the connection to its # Explicitly remove references from the connection to its
# cache and to the root object, because they refer back to the # cache and to the root object, because they refer back to the
# connection. # connection.
self._cache.clear() if self._cache is not None:
self._cache = None self._cache.clear()
self._incrgc = None self._incrgc = None
self.cacheGC = None self.cacheGC = None
self._root_ = None
def __getitem__(self, oid, tt=type(())): def __getitem__(self, oid, tt=type(())):
obj = self._cache.get(oid, None) obj = self._cache.get(oid, None)
...@@ -176,8 +175,6 @@ class Connection(ExportImport.ExportImport): ...@@ -176,8 +175,6 @@ class Connection(ExportImport.ExportImport):
object._p_serial=serial object._p_serial=serial
self._cache[oid] = object self._cache[oid] = object
if oid=='\0\0\0\0\0\0\0\0':
self._root_=object # keep a ref
return object return object
def _persistent_load(self,oid, def _persistent_load(self,oid,
...@@ -279,7 +276,8 @@ class Connection(ExportImport.ExportImport): ...@@ -279,7 +276,8 @@ class Connection(ExportImport.ExportImport):
self.__onCloseCallbacks.append(f) self.__onCloseCallbacks.append(f)
def close(self): def close(self):
self._incrgc() # This is a good time to do some GC if self._incrgc is not None:
self._incrgc() # This is a good time to do some GC
# Call the close callbacks. # Call the close callbacks.
if self.__onCloseCallbacks is not None: if self.__onCloseCallbacks is not None:
......
...@@ -13,8 +13,8 @@ ...@@ -13,8 +13,8 @@
############################################################################## ##############################################################################
"""Database objects """Database objects
$Id: DB.py,v 1.53 2003/06/24 21:50:18 jeremy Exp $""" $Id: DB.py,v 1.54 2003/09/15 16:29:15 jeremy Exp $"""
__version__='$Revision: 1.53 $'[11:-2] __version__='$Revision: 1.54 $'[11:-2]
import cPickle, cStringIO, sys, POSException, UndoLogCompatible import cPickle, cStringIO, sys, POSException, UndoLogCompatible
from Connection import Connection from Connection import Connection
...@@ -32,7 +32,7 @@ def list2dict(L): ...@@ -32,7 +32,7 @@ def list2dict(L):
d[elt] = 1 d[elt] = 1
return d return d
class DB(UndoLogCompatible.UndoLogCompatible): class DB(UndoLogCompatible.UndoLogCompatible, object):
"""The Object Database """The Object Database
The Object database coordinates access to and interaction of one The Object database coordinates access to and interaction of one
......
...@@ -79,7 +79,7 @@ method:: ...@@ -79,7 +79,7 @@ method::
and call it to monitor the storage. and call it to monitor the storage.
""" """
__version__='$Revision: 1.19 $'[11:-2] __version__='$Revision: 1.20 $'[11:-2]
import base64, time, string import base64, time, string
from ZODB import POSException, BaseStorage, utils from ZODB import POSException, BaseStorage, utils
......
...@@ -115,7 +115,7 @@ ...@@ -115,7 +115,7 @@
# may have a back pointer to a version record or to a non-version # may have a back pointer to a version record or to a non-version
# record. # record.
# #
__version__='$Revision: 1.136 $'[11:-2] __version__='$Revision: 1.137 $'[11:-2]
import base64 import base64
from cPickle import Pickler, Unpickler, loads from cPickle import Pickler, Unpickler, loads
...@@ -157,17 +157,21 @@ DATA_VERSION_HDR_LEN = 58 ...@@ -157,17 +157,21 @@ DATA_VERSION_HDR_LEN = 58
assert struct.calcsize(TRANS_HDR) == TRANS_HDR_LEN assert struct.calcsize(TRANS_HDR) == TRANS_HDR_LEN
assert struct.calcsize(DATA_HDR) == DATA_HDR_LEN assert struct.calcsize(DATA_HDR) == DATA_HDR_LEN
def blather(message, *data):
LOG('ZODB FS', BLATHER, "%s blather: %s\n" % (packed_version,
message % data))
def warn(message, *data): def warn(message, *data):
LOG('ZODB FS', WARNING, "%s warn: %s\n" % (packed_version, LOG('ZODB FS', WARNING, "%s warn: %s\n" % (packed_version,
(message % data))) message % data))
def error(message, *data): def error(message, *data):
LOG('ZODB FS', ERROR, "%s ERROR: %s\n" % (packed_version, LOG('ZODB FS', ERROR, "%s ERROR: %s\n" % (packed_version,
(message % data))) message % data))
def nearPanic(message, *data): def nearPanic(message, *data):
LOG('ZODB FS', PANIC, "%s ERROR: %s\n" % (packed_version, LOG('ZODB FS', PANIC, "%s ERROR: %s\n" % (packed_version,
(message % data))) message % data))
def panic(message, *data): def panic(message, *data):
message = message % data message = message % data
...@@ -234,8 +238,10 @@ class FileStorage(BaseStorage.BaseStorage, ...@@ -234,8 +238,10 @@ class FileStorage(BaseStorage.BaseStorage,
BaseStorage.BaseStorage.__init__(self, file_name) BaseStorage.BaseStorage.__init__(self, file_name)
index, vindex, tindex, tvindex = self._newIndexes() (index, vindex, tindex, tvindex,
self._initIndex(index, vindex, tindex, tvindex) oid2serial, toid2serial, toid2serial_delete) = self._newIndexes()
self._initIndex(index, vindex, tindex, tvindex,
oid2serial, toid2serial, toid2serial_delete)
# Now open the file # Now open the file
...@@ -269,7 +275,8 @@ class FileStorage(BaseStorage.BaseStorage, ...@@ -269,7 +275,8 @@ class FileStorage(BaseStorage.BaseStorage,
self._used_index = 1 # Marker for testing self._used_index = 1 # Marker for testing
index, vindex, start, maxoid, ltid = r index, vindex, start, maxoid, ltid = r
self._initIndex(index, vindex, tindex, tvindex) self._initIndex(index, vindex, tindex, tvindex,
oid2serial, toid2serial, toid2serial_delete)
self._pos, self._oid, tid = read_index( self._pos, self._oid, tid = read_index(
self._file, file_name, index, vindex, tindex, stop, self._file, file_name, index, vindex, tindex, stop,
ltid=ltid, start=start, maxoid=maxoid, ltid=ltid, start=start, maxoid=maxoid,
...@@ -302,7 +309,11 @@ class FileStorage(BaseStorage.BaseStorage, ...@@ -302,7 +309,11 @@ class FileStorage(BaseStorage.BaseStorage,
self._quota = quota self._quota = quota
def _initIndex(self, index, vindex, tindex, tvindex): # Serialno cache statistics.
self._oid2serial_nlookups = self._oid2serial_nhits = 0
def _initIndex(self, index, vindex, tindex, tvindex,
oid2serial, toid2serial, toid2serial_delete):
self._index=index self._index=index
self._vindex=vindex self._vindex=vindex
self._tindex=tindex self._tindex=tindex
...@@ -310,12 +321,33 @@ class FileStorage(BaseStorage.BaseStorage, ...@@ -310,12 +321,33 @@ class FileStorage(BaseStorage.BaseStorage,
self._index_get=index.get self._index_get=index.get
self._vindex_get=vindex.get self._vindex_get=vindex.get
# .store() needs to compare the passed-in serial to the current
# serial in the database. _oid2serial caches the oid -> current
# serial mapping for non-version data (if the current record for
# oid is version data, the oid is not a key in _oid2serial).
# The point is that otherwise seeking into the storage is needed
# to extract the current serial, and that's an expensive operation.
# For example, if a transaction stores 4000 objects, and each
# random seek + read takes 7ms (that was approximately true on
# Linux and Windows tests in mid-2003), that's 28 seconds just to
# find the old serials.
# XXX Probably better to junk this and redefine _index as mapping
# XXX oid to (offset, serialno) pair, via a new memory-efficient
# XXX BTree type.
self._oid2serial = oid2serial
# oid->serialno map to transactionally add to _oid2serial.
self._toid2serial = toid2serial
# Set of oids to transactionally delete from _oid2serial (e.g.,
# oids reverted by undo, or for which the most recent record
# becomes version data).
self._toid2serial_delete = toid2serial_delete
def __len__(self): def __len__(self):
return len(self._index) return len(self._index)
def _newIndexes(self): def _newIndexes(self):
# hook to use something other than builtin dict # hook to use something other than builtin dict
return fsIndex(), {}, {}, {} return fsIndex(), {}, {}, {}, {}, {}, {}
_saved = 0 _saved = 0
def _save_index(self): def _save_index(self):
...@@ -483,6 +515,31 @@ class FileStorage(BaseStorage.BaseStorage, ...@@ -483,6 +515,31 @@ class FileStorage(BaseStorage.BaseStorage,
# XXX should log the error, though # XXX should log the error, though
pass # We don't care if this fails. pass # We don't care if this fails.
# Return serial number of most recent record for oid if that's in
# the _oid2serial cache. Else return None. It's important to use
# this instead of indexing _oid2serial directly so that cache
# statistics can be logged.
def _get_cached_serial(self, oid):
self._oid2serial_nlookups += 1
result = self._oid2serial.get(oid)
if result is not None:
self._oid2serial_nhits += 1
# Log a msg every ~8000 tries, and prevent overflow.
if self._oid2serial_nlookups & 0x1fff == 0:
if self._oid2serial_nlookups >> 30:
# In older Pythons, we may overflow if we keep it an int.
self._oid2serial_nlookups = long(self._oid2serial_nlookups)
self._oid2serial_nhits = long(self._oid2serial_nhits)
blather("_oid2serial size %s lookups %s hits %s rate %.1f%%",
len(self._oid2serial),
self._oid2serial_nlookups,
self._oid2serial_nhits,
100.0 * self._oid2serial_nhits /
self._oid2serial_nlookups)
return result
def abortVersion(self, src, transaction): def abortVersion(self, src, transaction):
return self.commitVersion(src, '', transaction, abort=1) return self.commitVersion(src, '', transaction, abort=1)
...@@ -585,33 +642,11 @@ class FileStorage(BaseStorage.BaseStorage, ...@@ -585,33 +642,11 @@ class FileStorage(BaseStorage.BaseStorage,
spos = h[-8:] spos = h[-8:]
srcpos = u64(spos) srcpos = u64(spos)
self._toid2serial_delete.update(current_oids)
return oids return oids
def getSize(self): return self._pos def getSize(self):
return self._pos
def _loada(self, oid, _index, file):
"Read any version and return the version"
try:
pos=_index[oid]
except KeyError:
raise POSKeyError(oid)
except TypeError:
raise TypeError, 'invalid oid %r' % (oid,)
file.seek(pos)
read=file.read
h=read(DATA_HDR_LEN)
doid,serial,prev,tloc,vlen,plen = unpack(DATA_HDR, h)
if vlen:
nv = u64(read(8))
read(8) # Skip previous version record pointer
version = read(vlen)
else:
version = ''
nv = 0
if plen != z64:
return read(u64(plen)), version, nv
return _loadBack(file, oid, read(8))[0], version, nv
def _load(self, oid, version, _index, file): def _load(self, oid, version, _index, file):
try: try:
...@@ -632,6 +667,10 @@ class FileStorage(BaseStorage.BaseStorage, ...@@ -632,6 +667,10 @@ class FileStorage(BaseStorage.BaseStorage,
(read(8) # skip past version link (read(8) # skip past version link
and version != read(vlen))): and version != read(vlen))):
return _loadBack(file, oid, pnv) return _loadBack(file, oid, pnv)
else:
# The most recent record is for non-version data -- cache
# the serialno.
self._oid2serial[oid] = serial
# If we get here, then either this was not a version record, # If we get here, then either this was not a version record,
# or we've already read past the version data! # or we've already read past the version data!
...@@ -713,20 +752,25 @@ class FileStorage(BaseStorage.BaseStorage, ...@@ -713,20 +752,25 @@ class FileStorage(BaseStorage.BaseStorage,
self._lock_acquire() self._lock_acquire()
try: try:
old=self._index_get(oid, 0) old = self._index_get(oid, 0)
pnv=None cached_serial = None
pnv = None
if old: if old:
self._file.seek(old) cached_serial = self._get_cached_serial(oid)
h=self._file.read(DATA_HDR_LEN) if cached_serial is None:
doid,oserial,sprev,stloc,vlen,splen = unpack(DATA_HDR, h) self._file.seek(old)
if doid != oid: raise CorruptedDataError(h) h=self._file.read(DATA_HDR_LEN)
if vlen: doid,oserial,sprev,stloc,vlen,splen = unpack(DATA_HDR, h)
pnv=self._file.read(8) # non-version data pointer if doid != oid: raise CorruptedDataError(h)
self._file.read(8) # skip past version link if vlen:
locked_version=self._file.read(vlen) pnv=self._file.read(8) # non-version data pointer
if version != locked_version: self._file.read(8) # skip past version link
raise POSException.VersionLockError, ( locked_version=self._file.read(vlen)
`oid`, locked_version) if version != locked_version:
raise POSException.VersionLockError, (
`oid`, locked_version)
else:
oserial = cached_serial
if serial != oserial: if serial != oserial:
data = self.tryToResolveConflict(oid, oserial, serial, data = self.tryToResolveConflict(oid, oserial, serial,
...@@ -749,14 +793,19 @@ class FileStorage(BaseStorage.BaseStorage, ...@@ -749,14 +793,19 @@ class FileStorage(BaseStorage.BaseStorage,
) )
) )
if version: if version:
if pnv: write(pnv) if pnv:
else: write(p64(old)) write(pnv)
else:
write(p64(old))
# Link to last record for this version: # Link to last record for this version:
tvindex=self._tvindex tvindex=self._tvindex
pv=tvindex.get(version, 0) or self._vindex_get(version, 0) pv=tvindex.get(version, 0) or self._vindex_get(version, 0)
write(p64(pv)) write(p64(pv))
tvindex[version]=here tvindex[version]=here
write(version) write(version)
self._toid2serial_delete[oid] = 1
else:
self._toid2serial[oid] = newserial
write(data) write(data)
...@@ -875,7 +924,11 @@ class FileStorage(BaseStorage.BaseStorage, ...@@ -875,7 +924,11 @@ class FileStorage(BaseStorage.BaseStorage,
self._tfile.write(p64(pv)) self._tfile.write(p64(pv))
self._tvindex[version] = here self._tvindex[version] = here
self._tfile.write(version) self._tfile.write(version)
# And finally, write the data or a backpointer self._toid2serial_delete[oid] = 1
else:
self._toid2serial[oid] = serial
# Finally, write the data or a backpointer.
if data is None: if data is None:
if prev_pos: if prev_pos:
self._tfile.write(p64(prev_pos)) self._tfile.write(p64(prev_pos))
...@@ -940,6 +993,8 @@ class FileStorage(BaseStorage.BaseStorage, ...@@ -940,6 +993,8 @@ class FileStorage(BaseStorage.BaseStorage,
def _clear_temp(self): def _clear_temp(self):
self._tindex.clear() self._tindex.clear()
self._tvindex.clear() self._tvindex.clear()
self._toid2serial.clear()
self._toid2serial_delete.clear()
if self._tfile is not None: if self._tfile is not None:
self._tfile.seek(0) self._tfile.seek(0)
...@@ -1023,6 +1078,12 @@ class FileStorage(BaseStorage.BaseStorage, ...@@ -1023,6 +1078,12 @@ class FileStorage(BaseStorage.BaseStorage,
self._index.update(self._tindex) self._index.update(self._tindex)
self._vindex.update(self._tvindex) self._vindex.update(self._tvindex)
self._oid2serial.update(self._toid2serial)
for oid in self._toid2serial_delete.keys():
try:
del self._oid2serial[oid]
except KeyError:
pass
# Update the number of records that we've written # Update the number of records that we've written
# +1 for the transaction record # +1 for the transaction record
...@@ -1090,21 +1151,28 @@ class FileStorage(BaseStorage.BaseStorage, ...@@ -1090,21 +1151,28 @@ class FileStorage(BaseStorage.BaseStorage,
def getSerial(self, oid): def getSerial(self, oid):
self._lock_acquire() self._lock_acquire()
try: try:
try: result = self._get_cached_serial(oid)
return self._getSerial(oid, self._index[oid]) if result is None:
except KeyError: try:
raise POSKeyError(oid) result = self._getSerial(oid, self._index[oid])
except TypeError: except KeyError:
raise TypeError, 'invalid oid %r' % (oid,) raise POSKeyError(oid)
except TypeError:
raise TypeError, 'invalid oid %r' % (oid,)
return result
finally: finally:
self._lock_release() self._lock_release()
def _getSerial(self, oid, pos): def _getSerial(self, oid, pos):
self._file.seek(pos) self._file.seek(pos)
h = self._file.read(DATA_HDR_LEN) h = self._file.read(16)
if len(h) < 16:
raise CorruptedDataError(h)
h += self._file.read(26) # get rest of header
if h[:8] != oid:
raise CorruptedDataError(h)
oid2, serial, sprev, stloc, vlen, splen = unpack(DATA_HDR, h) oid2, serial, sprev, stloc, vlen, splen = unpack(DATA_HDR, h)
assert oid == oid2 if splen == z64:
if splen==z64:
# a back pointer # a back pointer
bp = self._file.read(8) bp = self._file.read(8)
if bp == z64: if bp == z64:
...@@ -1243,6 +1311,10 @@ class FileStorage(BaseStorage.BaseStorage, ...@@ -1243,6 +1311,10 @@ class FileStorage(BaseStorage.BaseStorage,
tpos = self._txn_find(tid, 1) tpos = self._txn_find(tid, 1)
tindex = self._txn_undo_write(tpos) tindex = self._txn_undo_write(tpos)
self._tindex.update(tindex) self._tindex.update(tindex)
# Arrange to clear the affected oids from the oid2serial cache.
# It's too painful to try to update them to correct current
# values instead.
self._toid2serial_delete.update(tindex)
return tindex.keys() return tindex.keys()
def _txn_find(self, tid, stop_at_pack): def _txn_find(self, tid, stop_at_pack):
...@@ -1500,7 +1572,9 @@ class FileStorage(BaseStorage.BaseStorage, ...@@ -1500,7 +1572,9 @@ class FileStorage(BaseStorage.BaseStorage,
# OK, we're beyond the point of no return # OK, we're beyond the point of no return
os.rename(self._file_name + '.pack', self._file_name) os.rename(self._file_name + '.pack', self._file_name)
self._file = open(self._file_name, 'r+b') self._file = open(self._file_name, 'r+b')
self._initIndex(p.index, p.vindex, p.tindex, p.tvindex) self._initIndex(p.index, p.vindex, p.tindex, p.tvindex,
p.oid2serial, p.toid2serial,
p.toid2serial_delete)
self._pos = opos self._pos = opos
self._save_index() self._save_index()
finally: finally:
...@@ -1526,20 +1600,9 @@ class FileStorage(BaseStorage.BaseStorage, ...@@ -1526,20 +1600,9 @@ class FileStorage(BaseStorage.BaseStorage,
if it is a new object -- return None. if it is a new object -- return None.
""" """
try: try:
pos = self._index[oid] return self.getSerial(oid)
except KeyError: except KeyError:
return None return None
except TypeError:
raise TypeError, 'invalid oid %r' % (oid,)
self._file.seek(pos)
# first 8 bytes are oid, second 8 bytes are serialno
h = self._file.read(16)
if len(h) < 16:
raise CorruptedDataError(h)
if h[:8] != oid:
h = h + self._file.read(26) # get rest of header
raise CorruptedDataError(h)
return h[8:]
def cleanup(self): def cleanup(self):
"""Remove all files created by this storage.""" """Remove all files created by this storage."""
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
static char TimeStamp_module_documentation[] = static char TimeStamp_module_documentation[] =
"Defines 64-bit TimeStamp objects used as ZODB serial numbers.\n" "Defines 64-bit TimeStamp objects used as ZODB serial numbers.\n"
"\n" "\n"
"\n$Id: TimeStamp.c,v 1.19 2003/06/20 18:38:24 tim_one Exp $\n"; "\n$Id: TimeStamp.c,v 1.20 2003/09/15 16:29:15 jeremy Exp $\n";
#ifdef USE_EXTENSION_CLASS #ifdef USE_EXTENSION_CLASS
#include "ExtensionClass.h" #include "ExtensionClass.h"
......
...@@ -47,7 +47,7 @@ ...@@ -47,7 +47,7 @@
--> -->
<sectiontype name="fullstorage" datatype=".BDBFullStorage" <sectiontype name="fullstorage" datatype=".BDBFullStorage"
implements="ZODB.storage"> implements="ZODB.storage">
<key name="name" required="yes" /> <key name="envdir" required="yes" />
<key name="interval" datatype="time-interval" default="2m" /> <key name="interval" datatype="time-interval" default="2m" />
<key name="kbyte" datatype="integer" default="0" /> <key name="kbyte" datatype="integer" default="0" />
<key name="min" datatype="integer" default="0" /> <key name="min" datatype="integer" default="0" />
...@@ -55,7 +55,7 @@ ...@@ -55,7 +55,7 @@
<key name="cachesize" datatype="byte-size" default="128MB" /> <key name="cachesize" datatype="byte-size" default="128MB" />
<key name="frequency" datatype="time-interval" default="0" /> <key name="frequency" datatype="time-interval" default="0" />
<key name="packtime" datatype="time-interval" default="4h" /> <key name="packtime" datatype="time-interval" default="4h" />
<key name="classicpack" datatype="integer" default="0" /> <key name="gcpack" datatype="integer" default="0" />
<key name="read-only" datatype="boolean" default="off"/> <key name="read-only" datatype="boolean" default="off"/>
</sectiontype> </sectiontype>
......
...@@ -13,7 +13,7 @@ ...@@ -13,7 +13,7 @@
############################################################################## ##############################################################################
"""Open database and storage from a configuration. """Open database and storage from a configuration.
$Id: config.py,v 1.13 2003/06/16 14:51:49 jeremy Exp $""" $Id: config.py,v 1.14 2003/09/15 16:29:15 jeremy Exp $"""
import os import os
from cStringIO import StringIO from cStringIO import StringIO
...@@ -157,7 +157,7 @@ class BDBStorage(BaseConfig): ...@@ -157,7 +157,7 @@ class BDBStorage(BaseConfig):
if name.startswith('_'): if name.startswith('_'):
continue continue
setattr(bconf, name, getattr(self.config, name)) setattr(bconf, name, getattr(self.config, name))
return storageclass(self.config.name, config=bconf) return storageclass(self.config.envdir, config=bconf)
class BDBMinimalStorage(BDBStorage): class BDBMinimalStorage(BDBStorage):
......
...@@ -33,7 +33,7 @@ import struct ...@@ -33,7 +33,7 @@ import struct
from types import StringType from types import StringType
from ZODB.referencesf import referencesf from ZODB.referencesf import referencesf
from ZODB.utils import p64, u64, z64 from ZODB.utils import p64, u64, z64, oid_repr
from zLOG import LOG, BLATHER, WARNING, ERROR, PANIC from zLOG import LOG, BLATHER, WARNING, ERROR, PANIC
try: try:
...@@ -54,7 +54,7 @@ class CorruptedDataError(CorruptedError): ...@@ -54,7 +54,7 @@ class CorruptedDataError(CorruptedError):
def __str__(self): def __str__(self):
if self.oid: if self.oid:
msg = "Error reading oid %s. Found %r" % (_fmt_oid(self.oid), msg = "Error reading oid %s. Found %r" % (oid_repr(self.oid),
self.buf) self.buf)
else: else:
msg = "Error reading unknown oid. Found %r" % self.buf msg = "Error reading unknown oid. Found %r" % self.buf
...@@ -166,7 +166,7 @@ class FileStorageFormatter: ...@@ -166,7 +166,7 @@ class FileStorageFormatter:
def checkTxn(self, th, pos): def checkTxn(self, th, pos):
if th.tid <= self.ltid: if th.tid <= self.ltid:
self.fail(pos, "time-stamp reduction: %s <= %s", self.fail(pos, "time-stamp reduction: %s <= %s",
_fmt_oid(th.tid), _fmt_oid(self.ltid)) oid_repr(th.tid), oid_repr(self.ltid))
self.ltid = th.tid self.ltid = th.tid
if th.status == "c": if th.status == "c":
self.fail(pos, "transaction with checkpoint flag set") self.fail(pos, "transaction with checkpoint flag set")
...@@ -647,11 +647,15 @@ class FileStoragePacker(FileStorageFormatter): ...@@ -647,11 +647,15 @@ class FileStoragePacker(FileStorageFormatter):
# vindex: version -> pos of XXX # vindex: version -> pos of XXX
# tindex: oid -> pos, for current txn # tindex: oid -> pos, for current txn
# tvindex: version -> pos of XXX, for current txn # tvindex: version -> pos of XXX, for current txn
# oid2serial: not used by the packer
self.index = fsIndex() self.index = fsIndex()
self.vindex = {} self.vindex = {}
self.tindex = {} self.tindex = {}
self.tvindex = {} self.tvindex = {}
self.oid2serial = {}
self.toid2serial = {}
self.toid2serial_delete = {}
# Index for non-version data. This is a temporary structure # Index for non-version data. This is a temporary structure
# to reduce I/O during packing # to reduce I/O during packing
...@@ -757,7 +761,7 @@ class FileStoragePacker(FileStorageFormatter): ...@@ -757,7 +761,7 @@ class FileStoragePacker(FileStorageFormatter):
If any data records are copied, also write txn header (th). If any data records are copied, also write txn header (th).
""" """
copy = 0 copy = 0
new_tpos = 0 new_tpos = 0L
tend = pos + th.tlen tend = pos + th.tlen
pos += th.headerlen() pos += th.headerlen()
while pos < tend: while pos < tend:
......
...@@ -381,6 +381,60 @@ class PackableStorage(PackableStorageBase): ...@@ -381,6 +381,60 @@ class PackableStorage(PackableStorageBase):
eq(root['obj'].value, 7) eq(root['obj'].value, 7)
def _PackWhileWriting(self, pack_now=0):
# A storage should allow some reading and writing during
# a pack. This test attempts to exercise locking code
# in the storage to test that it is safe. It generates
# a lot of revisions, so that pack takes a long time.
db = DB(self._storage)
conn = db.open()
root = conn.root()
for i in range(10):
root[i] = MinPO(i)
get_transaction().commit()
snooze()
packt = time.time()
for j in range(10):
for i in range(10):
root[i].value = MinPO(i)
get_transaction().commit()
threads = [ClientThread(db) for i in range(4)]
for t in threads:
t.start()
if pack_now:
db.pack(time.time())
else:
db.pack(packt)
for t in threads:
t.join(30)
for t in threads:
t.join(1)
self.assert_(not t.isAlive())
# Iterate over the storage to make sure it's sane, but not every
# storage supports iterators.
if not hasattr(self._storage, "iterator"):
return
iter = self._storage.iterator()
for txn in iter:
for data in txn:
pass
iter.close()
def checkPackWhileWriting(self):
self._PackWhileWriting(pack_now=0)
def checkPackNowWhileWriting(self):
self._PackWhileWriting(pack_now=1)
def checkPackUndoLog(self): def checkPackUndoLog(self):
self._initroot() self._initroot()
eq = self.assertEqual eq = self.assertEqual
...@@ -450,47 +504,6 @@ class PackableStorage(PackableStorageBase): ...@@ -450,47 +504,6 @@ class PackableStorage(PackableStorageBase):
for r in self._storage.undoLog(): print r for r in self._storage.undoLog(): print r
# what can we assert about that? # what can we assert about that?
def checkPackWhileWriting(self):
# A storage should allow some reading and writing during
# a pack. This test attempts to exercise locking code
# in the storage to test that it is safe. It generates
# a lot of revisions, so that pack takes a long time.
db = DB(self._storage)
conn = db.open()
root = conn.root()
for i in range(10):
root[i] = MinPO(i)
get_transaction().commit()
snooze()
packt = time.time()
for j in range(10):
for i in range(10):
root[i].value = MinPO(i)
get_transaction().commit()
threads = [ClientThread(db) for i in range(4)]
for t in threads:
t.start()
db.pack(packt)
for t in threads:
t.join(30)
for t in threads:
t.join(1)
self.assert_(not t.isAlive())
# iterator over the storage to make sure it's sane
if not hasattr(self._storage, "iterator"):
return
iter = self._storage.iterator()
for txn in iter:
for data in txn:
pass
iter.close()
class ClientThread(threading.Thread): class ClientThread(threading.Thread):
def __init__(self, db): def __init__(self, db):
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
from ZODB.Transaction import Transaction from ZODB.Transaction import Transaction
from ZODB.tests.IteratorStorage import IteratorDeepCompare from ZODB.tests.IteratorStorage import IteratorDeepCompare
from ZODB.tests.StorageTestBase import MinPO, zodb_unpickle from ZODB.tests.StorageTestBase import MinPO, zodb_unpickle, snooze
from ZODB import DB from ZODB import DB
from ZODB.referencesf import referencesf from ZODB.referencesf import referencesf
...@@ -154,3 +154,31 @@ class RecoveryStorage(IteratorDeepCompare): ...@@ -154,3 +154,31 @@ class RecoveryStorage(IteratorDeepCompare):
it.close() it.close()
self._dst.tpc_vote(final) self._dst.tpc_vote(final)
self._dst.tpc_finish(final) self._dst.tpc_finish(final)
def checkPackWithGCOnDestinationAfterRestore(self):
raises = self.assertRaises
db = DB(self._storage)
conn = db.open()
root = conn.root()
root.obj = obj1 = MinPO(1)
txn = get_transaction()
txn.note('root -> obj')
txn.commit()
root.obj.obj = obj2 = MinPO(2)
txn = get_transaction()
txn.note('root -> obj -> obj')
txn.commit()
del root.obj
txn = get_transaction()
txn.note('root -X->')
txn.commit()
# Now copy the transactions to the destination
self._dst.copyTransactionsFrom(self._storage)
# Now pack the destination.
snooze()
self._dst.pack(time.time(), referencesf)
# And check to see that the root object exists, but not the other
# objects.
data, serial = self._dst.load(root._p_oid, '')
raises(KeyError, self._dst.load, obj1._p_oid, '')
raises(KeyError, self._dst.load, obj2._p_oid, '')
...@@ -488,6 +488,7 @@ class VersionStorage: ...@@ -488,6 +488,7 @@ class VersionStorage:
root["d"] = MinPO("d") root["d"] = MinPO("d")
get_transaction().commit() get_transaction().commit()
snooze()
self._storage.pack(time.time(), referencesf) self._storage.pack(time.time(), referencesf)
cn.sync() cn.sync()
......
...@@ -185,9 +185,7 @@ class LRUCacheTests(CacheTestBase): ...@@ -185,9 +185,7 @@ class LRUCacheTests(CacheTestBase):
self.assertEquals(len(details), CONNS) self.assertEquals(len(details), CONNS)
for d in details: for d in details:
self.assertEquals(d['ngsize'], CACHE_SIZE) self.assertEquals(d['ngsize'], CACHE_SIZE)
# the root is also in the cache as ghost, because self.assertEquals(d['size'], CACHE_SIZE)
# the connection holds a reference to it
self.assertEquals(d['size'], CACHE_SIZE + 1)
def checkDetail(self): def checkDetail(self):
CACHE_SIZE = 10 CACHE_SIZE = 10
......
...@@ -123,7 +123,7 @@ class BDBConfigTest(ConfigTestBase): ...@@ -123,7 +123,7 @@ class BDBConfigTest(ConfigTestBase):
cfg = """ cfg = """
<zodb> <zodb>
<fullstorage> <fullstorage>
name %s envdir %s
</fullstorage> </fullstorage>
</zodb> </zodb>
""" % self._path """ % self._path
...@@ -133,7 +133,7 @@ class BDBConfigTest(ConfigTestBase): ...@@ -133,7 +133,7 @@ class BDBConfigTest(ConfigTestBase):
cfg = """ cfg = """
<zodb> <zodb>
<minimalstorage> <minimalstorage>
name %s envdir %s
</minimalstorage> </minimalstorage>
</zodb> </zodb>
""" % self._path """ % self._path
......
...@@ -88,7 +88,7 @@ class FileStorageTests( ...@@ -88,7 +88,7 @@ class FileStorageTests(
class OldFileStorage(ZODB.FileStorage.FileStorage): class OldFileStorage(ZODB.FileStorage.FileStorage):
def _newIndexes(self): def _newIndexes(self):
return {}, {}, {}, {} return {}, {}, {}, {}, {}, {}, {}
from ZODB.fsIndex import fsIndex from ZODB.fsIndex import fsIndex
...@@ -113,7 +113,7 @@ class FileStorageTests( ...@@ -113,7 +113,7 @@ class FileStorageTests(
class OldFileStorage(ZODB.FileStorage.FileStorage): class OldFileStorage(ZODB.FileStorage.FileStorage):
def _newIndexes(self): def _newIndexes(self):
return {}, {}, {}, {} return {}, {}, {}, {}, {}, {}, {}
from ZODB.fsIndex import fsIndex from ZODB.fsIndex import fsIndex
......
This directory contains a collect of utilities for managing ZODB
databases. Some are more useful than others. If you install ZODB
using distutils ("python setup.py install"), fsdump.py, fstest.py,
repozo.py, and zeopack.py will be installed in /usr/local/bin.
Unless otherwise noted, these scripts are invoked with the name of the
Data.fs file as their only argument. Example: checkbtrees.py data.fs.
analyze.py -- A transaction analyzer for FileStorage
Reports on the data in a FileStorage. The report is organized by
class. It shows total data, as well as separate reports for current
and historical revisions of objects.
checkbtrees.py -- Checks BTrees in a FileStorage for corruption.
Attempts to find all the BTrees contained in a Data.fs and calls their
_check() methods.
fsdump.py -- Summarize FileStorage contents, one line per revision.
Prints a report of FileStorage contents, with one line for each
transaction and one line for each data record in that transaction.
Includes time stamps, file positions, and class names.
fstest.py -- Simple consistency checker for FileStorage
usage: fstest.py [-v] data.fs
The fstest tool will scan all the data in a FileStorage and report an
error if it finds any corrupt transaction data. The tool will print a
message when the first error is detected an exit.
The tool accepts one or more -v arguments. If a single -v is used, it
will print a line of text for each transaction record it encounters.
If two -v arguments are used, it will also print a line of text for
each object. The objects for a transaction will be printed before the
transaction itself.
Note: It does not check the consistency of the object pickles. It is
possible for the damage to occur only in the part of the file that
stores object pickles. Those errors will go undetected.
netspace.py -- Hackish attempt to report on size of objects
usage: netspace.py [-P | -v] data.fs
-P: do a pack first
-v: print info for all objects, even if a traversal path isn't found
Traverses objects from the database root and attempts to calculate
size of object, including all reachable subobjects.
parsezeolog.py -- Parse BLATHER logs from ZEO server.
This script may be obsolete. It has not been tested against the
current log output of the ZEO server.
Reports on the time and size of transactions committed by a ZEO
server, by inspecting log messages at BLATHER level.
repozo.py -- Incremental backup utility for FileStorage.
Run the script with the -h option to see usage details.
timeout.py -- Script to test transaction timeout
usage: timeout.py address delay [storage-name]
This script connects to a storage, begins a transaction, calls store()
and tpc_vote(), and then sleeps forever. This should trigger the
transaction timeout feature of the server.
zeopack.py -- Script to pack a ZEO server.
The script connects to a server and calls pack() on a specific
storage. See the script for usage details.
zeoreplay.py -- Experimental script to replay transactions from a ZEO log.
Like parsezeolog.py, this may be obsolete because it was written
against an earlier version of the ZEO server. See the script for
usage details.
zeoup.py
Usage: zeoup.py [options]
The test will connect to a ZEO server, load the root object, and
attempt to update the zeoup counter in the root. It will report
success if it updates to counter or if it gets a ConflictError. A
ConflictError is considered a success, because the client was able to
start a transaction.
See the script for details about the options.
zodbload.py - exercise ZODB under a heavy synthesized Zope-like load
See the module docstring for details. Note that this script requires
Zope. New in ZODB3 3.1.4.
zeoserverlog.py - analyze ZEO server log for performance statistics
See the module docstring for details; there are a large number of
options. New in ZODB3 3.1.4.
\ No newline at end of file
#! /usr/bin/env python #!python
# Based on a transaction analyzer by Matt Kromer. # Based on a transaction analyzer by Matt Kromer.
import pickle import pickle
...@@ -137,4 +137,3 @@ def analyze_rec(report, record): ...@@ -137,4 +137,3 @@ def analyze_rec(report, record):
if __name__ == "__main__": if __name__ == "__main__":
path = sys.argv[1] path = sys.argv[1]
report(analyze(path)) report(analyze(path))
#! /usr/bin/env python #!python
"""Check the consistency of BTrees in a Data.fs """Check the consistency of BTrees in a Data.fs
usage: checkbtrees.py data.fs usage: checkbtrees.py data.fs
Try to find all the BTrees in a Data.fs and call their _check() methods. Try to find all the BTrees in a Data.fs, call their _check() methods,
and run them through BTrees.check.check().
""" """
from types import IntType from types import IntType
import ZODB import ZODB
from ZODB.FileStorage import FileStorage from ZODB.FileStorage import FileStorage
from BTrees.check import check
# Set of oids we've already visited. Since the object structure is
# a general graph, this is needed to prevent unbounded paths in the
# presence of cycles. It's also helpful in eliminating redundant
# checking when a BTree is pointed to by many objects.
oids_seen = {}
# Append (obj, path) to L if and only if obj is a persistent object
# and we haven't seen it before.
def add_if_new_persistent(L, obj, path):
global oids_seen
def add_if_persistent(L, obj, path):
getattr(obj, '_', None) # unghostify getattr(obj, '_', None) # unghostify
if hasattr(obj, '_p_oid'): if hasattr(obj, '_p_oid'):
L.append((obj, path)) oid = obj._p_oid
if not oids_seen.has_key(oid):
L.append((obj, path))
oids_seen[oid] = 1
def get_subobjects(obj): def get_subobjects(obj):
getattr(obj, '_', None) # unghostify getattr(obj, '_', None) # unghostify
...@@ -25,7 +40,7 @@ def get_subobjects(obj): ...@@ -25,7 +40,7 @@ def get_subobjects(obj):
attrs = () attrs = ()
for pair in attrs: for pair in attrs:
sub.append(pair) sub.append(pair)
# what if it is a mapping? # what if it is a mapping?
try: try:
items = obj.items() items = obj.items()
...@@ -54,7 +69,7 @@ def main(fname): ...@@ -54,7 +69,7 @@ def main(fname):
cn = ZODB.DB(fs).open() cn = ZODB.DB(fs).open()
rt = cn.root() rt = cn.root()
todo = [] todo = []
add_if_persistent(todo, rt, '') add_if_new_persistent(todo, rt, '')
found = 0 found = 0
while todo: while todo:
...@@ -75,6 +90,13 @@ def main(fname): ...@@ -75,6 +90,13 @@ def main(fname):
print msg print msg
print "*" * 60 print "*" * 60
try:
check(obj)
except AssertionError, msg:
print "*" * 60
print msg
print "*" * 60
if found % 100 == 0: if found % 100 == 0:
cn.cacheMinimize() cn.cacheMinimize()
...@@ -84,7 +106,7 @@ def main(fname): ...@@ -84,7 +106,7 @@ def main(fname):
newpath = "%s%s" % (path, k) newpath = "%s%s" % (path, k)
else: else:
newpath = "%s.%s" % (path, k) newpath = "%s.%s" % (path, k)
add_if_persistent(todo, v, newpath) add_if_new_persistent(todo, v, newpath)
print "total", len(fs._index), "found", found print "total", len(fs._index), "found", found
......
...@@ -4,14 +4,14 @@ ...@@ -4,14 +4,14 @@
# #
# Copyright (c) 2002 Zope Corporation and Contributors. # Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved. # All Rights Reserved.
# #
# This software is subject to the provisions of the Zope Public License, # This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution. # Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED # THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED # WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS # WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE # FOR A PARTICULAR PURPOSE
# #
############################################################################## ##############################################################################
"""Check FileStorage for dangling references. """Check FileStorage for dangling references.
...@@ -77,7 +77,7 @@ def main(path): ...@@ -77,7 +77,7 @@ def main(path):
# that refer to this one, we won't get error reports from # that refer to this one, we won't get error reports from
# them. We could fix this by making two passes over the # them. We could fix this by making two passes over the
# storage, but that seems like overkill. # storage, but that seems like overkill.
refs = get_refs(data) refs = get_refs(data)
missing = [] # contains 3-tuples of oid, klass-metadata, reason missing = [] # contains 3-tuples of oid, klass-metadata, reason
for info in refs: for info in refs:
......
...@@ -4,14 +4,14 @@ ...@@ -4,14 +4,14 @@
# #
# Copyright (c) 2001, 2002 Zope Corporation and Contributors. # Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved. # All Rights Reserved.
# #
# This software is subject to the provisions of the Zope Public License, # This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution. # Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED # THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED # WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS # WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE # FOR A PARTICULAR PURPOSE
# #
############################################################################## ##############################################################################
"""Simple consistency checker for FileStorage. """Simple consistency checker for FileStorage.
...@@ -109,7 +109,7 @@ def check_trec(path, file, pos, ltid, file_size): ...@@ -109,7 +109,7 @@ def check_trec(path, file, pos, ltid, file_size):
It also leaves the file pointer set to pos. The path argument is It also leaves the file pointer set to pos. The path argument is
used for generating error messages. used for generating error messages.
""" """
h = file.read(TREC_HDR_LEN) h = file.read(TREC_HDR_LEN)
if not h: if not h:
return None, None return None, None
...@@ -131,7 +131,7 @@ def check_trec(path, file, pos, ltid, file_size): ...@@ -131,7 +131,7 @@ def check_trec(path, file, pos, ltid, file_size):
raise FormatError("%s truncated possibly because of" raise FormatError("%s truncated possibly because of"
" damaged records at %s" % (path, pos)) " damaged records at %s" % (path, pos))
if status == Status.checkpoint: if status == Status.checkpoint:
raise FormatError("%s checkpoint flag was not cleared at %s" raise FormatError("%s checkpoint flag was not cleared at %s"
% (path, pos)) % (path, pos))
if status not in ' up': if status not in ' up':
raise FormatError("%s has invalid status '%s' at %s" % raise FormatError("%s has invalid status '%s' at %s" %
......
#! /usr/bin/env python #!python
############################################################################## ##############################################################################
# #
# Copyright (c) 2001, 2002, 2003 Zope Corporation and Contributors. # Copyright (c) 2001, 2002, 2003 Zope Corporation and Contributors.
......
#!python
"""Report on the net size of objects counting subobjects. """Report on the net size of objects counting subobjects.
usage: netspace.py [-P | -v] data.fs usage: netspace.py [-P | -v] data.fs
...@@ -89,7 +90,7 @@ def main(path): ...@@ -89,7 +90,7 @@ def main(path):
keys = filter(paths.has_key, keys) keys = filter(paths.has_key, keys)
fmt = "%8s %5d %8d %s %s.%s" fmt = "%8s %5d %8d %s %s.%s"
for oid in keys: for oid in keys:
data, serialno = fs.load(oid, '') data, serialno = fs.load(oid, '')
mod, klass = get_pickle_metadata(data) mod, klass = get_pickle_metadata(data)
......
#!python
"""Parse the BLATHER logging generated by ZEO2. """Parse the BLATHER logging generated by ZEO2.
An example of the log format is: An example of the log format is:
...@@ -52,7 +53,7 @@ class TStats: ...@@ -52,7 +53,7 @@ class TStats:
fields = ("time", "vote", "done", "user", "path") fields = ("time", "vote", "done", "user", "path")
fmt = "%-24s %5s %5s %-15s %s" fmt = "%-24s %5s %5s %-15s %s"
hdr = fmt % fields hdr = fmt % fields
def report(self): def report(self):
"""Print a report about the transaction""" """Print a report about the transaction"""
t = time.ctime(self.begin) t = time.ctime(self.begin)
...@@ -98,7 +99,7 @@ class TransactionParser: ...@@ -98,7 +99,7 @@ class TransactionParser:
except KeyError: except KeyError:
print "uknown tid", repr(tid) print "uknown tid", repr(tid)
return None return None
def tpc_finish(self, time, args): def tpc_finish(self, time, args):
t = self.get_txn(args) t = self.get_txn(args)
if t is None: if t is None:
......
#!/usr/bin/env python #!python
# repozo.py -- incremental and full backups of a Data.fs file. # repozo.py -- incremental and full backups of a Data.fs file.
# #
# Originally written by Anthony Baxter # Originally written by Anthony Baxter
# Significantly modified by Barry Warsaw # Significantly modified by Barry Warsaw
#
# TODO:
# allow gzipping of backup files.
# allow backup files in subdirectories.
"""repozo.py -- incremental and full backups of a Data.fs file. """repozo.py -- incremental and full backups of a Data.fs file.
Usage: %(program)s [options] Usage: %(program)s [options]
Where: Where:
Exactly one of -B or -R must be specified:
-B / --backup -B / --backup
backup current ZODB file Backup current ZODB file.
-R / --recover -R / --recover
restore a ZODB file from a backup Restore a ZODB file from a backup.
-v / --verbose -v / --verbose
Verbose mode Verbose mode.
-h / --help -h / --help
Print this text and exit Print this text and exit.
-r dir -r dir
--repository=dir --repository=dir
Repository directory containing the backup files Repository directory containing the backup files. This argument
is required.
Flags for --backup: Options for -B/--backup:
-f file -f file
--file=file --file=file
Source Data.fs file Source Data.fs file. This argument is required.
-F / --full -F / --full
Force a full backup Force a full backup. By default, an incremental backup is made
if possible (e.g., if a pack has occurred since the last
incremental backup, a full backup is necessary).
-Q / --quick -Q / --quick
Verify via md5 checksum only the last incremental written. This Verify via md5 checksum only the last incremental written. This
significantly reduces the disk i/o at the (theoretical) cost of significantly reduces the disk i/o at the (theoretical) cost of
inconsistency. inconsistency. This is a probabilistic way of determining whether
a full backup is necessary.
-z / --gzip -z / --gzip
Compress with gzip the backup files. Uses the default zlib Compress with gzip the backup files. Uses the default zlib
compression level. compression level. By default, gzip compression is not used.
Flags for --recover: Options for -R/--recover:
-D str -D str
--date=str --date=str
Recover state as at this date. str is in the format Recover state as of this date. str is in the format
yyyy-mm-dd[-hh[-mm]] yyyy-mm-dd[-hh[-mm]]
By default, current time is used.
-o file -o filename
--output=file --output=filename
Write recovered ZODB to given file. If not given, the file will be Write recovered ZODB to given file. By default, the file is
written to stdout. written to stdout.
One of --backup or --recover is required.
""" """
from __future__ import nested_scopes from __future__ import nested_scopes
...@@ -120,14 +121,14 @@ def parseargs(): ...@@ -120,14 +121,14 @@ def parseargs():
usage(1, msg) usage(1, msg)
class Options: class Options:
mode = None mode = None # BACKUP or RECOVER
file = None file = None # name of input Data.fs file
repository = None repository = None # name of directory holding backups
full = False full = False # True forces full backup
date = None date = None # -D argument, if any
output = None output = None # where to write recovered data; None = stdout
quick = False quick = False # -Q flag state
gzip = False gzip = False # -z flag state
options = Options() options = Options()
...@@ -158,6 +159,8 @@ def parseargs(): ...@@ -158,6 +159,8 @@ def parseargs():
options.output = arg options.output = arg
elif opt in ('-z', '--gzip'): elif opt in ('-z', '--gzip'):
options.gzip = True options.gzip = True
else:
assert False, (opt, arg)
# Any other arguments are invalid # Any other arguments are invalid
if args: if args:
...@@ -184,20 +187,26 @@ def parseargs(): ...@@ -184,20 +187,26 @@ def parseargs():
# Do something with a run of bytes from a file # Read bytes (no more than n, or to EOF if n is None) in chunks from the
# current position in file fp. Pass each chunk as an argument to func().
# Return the total number of bytes read == the total number of bytes
# passed in all to func(). Leaves the file position just after the
# last byte read.
def dofile(func, fp, n=None): def dofile(func, fp, n=None):
bytesread = 0 bytesread = 0L
stop = False while n is None or n > 0:
chunklen = READCHUNK if n is None:
while not stop: todo = READCHUNK
if n is not None and chunklen + bytesread > n: else:
chunklen = n - bytesread todo = min(READCHUNK, n)
stop = True data = fp.read(todo)
data = fp.read(chunklen)
if not data: if not data:
break break
func(data) func(data)
bytesread += len(data) nread = len(data)
bytesread += nread
if n is not None:
n -= nread
return bytesread return bytesread
...@@ -223,9 +232,10 @@ def copyfile(options, dst, start, n): ...@@ -223,9 +232,10 @@ def copyfile(options, dst, start, n):
def func(data): def func(data):
sum.update(data) sum.update(data)
ofp.write(data) ofp.write(data)
dofile(func, ifp, n) ndone = dofile(func, ifp, n)
ofp.close() ofp.close()
ifp.close() ifp.close()
assert ndone == n
return sum.hexdigest() return sum.hexdigest()
...@@ -296,30 +306,34 @@ def find_files(options): ...@@ -296,30 +306,34 @@ def find_files(options):
log('no files found') log('no files found')
return needed return needed
# Scan the .dat file corresponding to the last full backup performed.
# Return
#
# filename, startpos, endpos, checksum
#
# of the last incremental. If there is no .dat file, or the .dat file
# is empty, return
#
# None, None, None, None
def scandat(repofiles): def scandat(repofiles):
# Scan the .dat file corresponding to the last full backup performed.
# Return the filename, startpos, endpos, and sum of the last incremental.
# If all is a list, then append file name and md5sums to the list.
fullfile = repofiles[0] fullfile = repofiles[0]
datfile = os.path.splitext(fullfile)[0] + '.dat' datfile = os.path.splitext(fullfile)[0] + '.dat'
# If the .dat file is missing, we have to do a full backup fn = startpos = endpos = sum = None # assume .dat file missing or empty
fn = startpos = endpos = sum = None
try: try:
fp = open(datfile) fp = open(datfile)
except IOError, e: except IOError, e:
if e.errno <> errno.ENOENT: if e.errno <> errno.ENOENT:
raise raise
else: else:
while True: # We only care about the last one.
line = fp.readline() lines = fp.readlines()
if not line:
break
# We only care about the last one
fn, startpos, endpos, sum = line.split()
fp.close() fp.close()
startpos = long(startpos) if lines:
endpos = long(endpos) fn, startpos, endpos, sum = lines[-1].split()
startpos = long(startpos)
endpos = long(endpos)
return fn, startpos, endpos, sum return fn, startpos, endpos, sum
...@@ -364,7 +378,7 @@ def do_incremental_backup(options, reposz, repofiles): ...@@ -364,7 +378,7 @@ def do_incremental_backup(options, reposz, repofiles):
print >> sys.stderr, 'Cannot overwrite existing file:', dest print >> sys.stderr, 'Cannot overwrite existing file:', dest
sys.exit(2) sys.exit(2)
log('writing incremental: %s bytes to %s', pos-reposz, dest) log('writing incremental: %s bytes to %s', pos-reposz, dest)
sum = copyfile(options, dest, reposz, pos) sum = copyfile(options, dest, reposz, pos - reposz)
# The first file in repofiles points to the last full backup. Use this to # The first file in repofiles points to the last full backup. Use this to
# get the .dat file and append the information for this incrementatl to # get the .dat file and append the information for this incrementatl to
# that file. # that file.
...@@ -398,14 +412,18 @@ def do_backup(options): ...@@ -398,14 +412,18 @@ def do_backup(options):
return return
# Now check the md5 sum of the source file, from the last # Now check the md5 sum of the source file, from the last
# incremental's start and stop positions. # incremental's start and stop positions.
srcfp = open(options.file) srcfp = open(options.file, 'rb')
srcfp.seek(startpos) srcfp.seek(startpos)
srcsum = checksum(srcfp, endpos-startpos) srcsum = checksum(srcfp, endpos-startpos)
srcfp.close()
log('last incremental file: %s', fn) log('last incremental file: %s', fn)
log('last incremental checksum: %s', sum) log('last incremental checksum: %s', sum)
log('source checksum range: [%s..%s], sum: %s', log('source checksum range: [%s..%s], sum: %s',
startpos, endpos, srcsum) startpos, endpos, srcsum)
if sum == srcsum: if sum == srcsum:
if srcsz == endpos:
log('No changes, nothing to do')
return
log('doing incremental, starting at: %s', endpos) log('doing incremental, starting at: %s', endpos)
do_incremental_backup(options, endpos, repofiles) do_incremental_backup(options, endpos, repofiles)
return return
...@@ -421,7 +439,7 @@ def do_backup(options): ...@@ -421,7 +439,7 @@ def do_backup(options):
# Get the md5 checksum of the source file, up to two file positions: # Get the md5 checksum of the source file, up to two file positions:
# the entire size of the file, and up to the file position of the last # the entire size of the file, and up to the file position of the last
# incremental backup. # incremental backup.
srcfp = open(options.file) srcfp = open(options.file, 'rb')
srcsum = checksum(srcfp, srcsz) srcsum = checksum(srcfp, srcsz)
srcfp.seek(0) srcfp.seek(0)
srcsum_backedup = checksum(srcfp, reposz) srcsum_backedup = checksum(srcfp, reposz)
......
#! /usr/bin/env python
"""Report on the space used by objects in a storage.
usage: space.py data.fs
The current implementation only supports FileStorage.
Current limitations / simplifications: Ignores revisions and versions.
"""
import ZODB
from ZODB.FileStorage import FileStorage
from ZODB.utils import U64
from ZODB.fsdump import get_pickle_metadata
def run(path, v=0):
fs = FileStorage(path, read_only=1)
# break into the file implementation
if hasattr(fs._index, 'iterkeys'):
iter = fs._index.iterkeys()
else:
iter = fs._index.keys()
totals = {}
for oid in iter:
data, serialno = fs.load(oid, '')
mod, klass = get_pickle_metadata(data)
key = "%s.%s" % (mod, klass)
bytes, count = totals.get(key, (0, 0))
bytes += len(data)
count += 1
totals[key] = bytes, count
if v:
print "%8s %5d %s" % (U64(oid), len(data), key)
L = totals.items()
L.sort(lambda a, b: cmp(a[1], b[1]))
L.reverse()
print "Totals per object class:"
for key, (bytes, count) in L:
print "%8d %8d %s" % (count, bytes, key)
def main():
import sys
import getopt
try:
opts, args = getopt.getopt(sys.argv[1:], "v")
except getopt.error, msg:
print msg
print "usage: space.py [-v] Data.fs"
sys.exit(2)
if len(args) != 1:
print "usage: space.py [-v] Data.fs"
sys.exit(2)
v = 0
for o, a in opts:
if o == "-v":
v += 1
path = args[0]
run(path, v)
if __name__ == "__main__":
main()
#!python
"""Transaction timeout test script.
This script connects to a storage, begins a transaction, calls store()
and tpc_vote(), and then sleeps forever. This should trigger the
transaction timeout feature of the server.
usage: timeout.py address delay [storage-name]
"""
import sys
import time
from ZODB.Transaction import Transaction
from ZODB.tests.MinPO import MinPO
from ZODB.tests.StorageTestBase import zodb_pickle
from ZEO.ClientStorage import ClientStorage
ZERO = '\0'*8
def main():
if len(sys.argv) not in (3, 4):
sys.stderr.write("Usage: timeout.py address delay [storage-name]\n" %
sys.argv[0])
sys.exit(2)
hostport = sys.argv[1]
delay = float(sys.argv[2])
if sys.argv[3:]:
name = sys.argv[3]
else:
name = "1"
if "/" in hostport:
address = hostport
else:
if ":" in hostport:
i = hostport.index(":")
host, port = hostport[:i], hostport[i+1:]
else:
host, port = "", hostport
port = int(port)
address = (host, port)
print "Connecting to %s..." % repr(address)
storage = ClientStorage(address, name)
print "Connected. Now starting a transaction..."
oid = storage.new_oid()
version = ""
revid = ZERO
data = MinPO("timeout.py")
pickled_data = zodb_pickle(data)
t = Transaction()
t.user = "timeout.py"
storage.tpc_begin(t)
storage.store(oid, revid, pickled_data, version, t)
print "Stored. Now voting..."
storage.tpc_vote(t)
print "Voted; now sleeping %s..." % delay
time.sleep(delay)
print "Done."
if __name__ == "__main__":
main()
#! /usr/bin/env python #!python
"""Connect to a ZEO server and ask it to pack. """Connect to a ZEO server and ask it to pack.
Usage: zeopack.py [options] Usage: zeopack.py [options]
...@@ -6,11 +6,11 @@ Usage: zeopack.py [options] ...@@ -6,11 +6,11 @@ Usage: zeopack.py [options]
Options: Options:
-p port -- port to connect to -p port -- port to connect to
-h host -- host to connect to (default is current host) -h host -- host to connect to (default is current host)
-U path -- Unix-domain socket to connect to -U path -- Unix-domain socket to connect to
-S name -- storage name (default is '1') -S name -- storage name (default is '1')
-d days -- pack objects more than days old -d days -- pack objects more than days old
......
#! /usr/bin/env python #!python
"""Report on the number of currently waiting clients in the ZEO queue. """Report on the number of currently waiting clients in the ZEO queue.
Usage: %(PROGRAM)s [options] logfile Usage: %(PROGRAM)s [options] logfile
......
#!python
"""Parse the BLATHER logging generated by ZEO, and optionally replay it. """Parse the BLATHER logging generated by ZEO, and optionally replay it.
Usage: zeointervals.py [options] Usage: zeointervals.py [options]
...@@ -186,7 +187,7 @@ class ZEOParser: ...@@ -186,7 +187,7 @@ class ZEOParser:
meth = getattr(txn, 'tpc_begin', None) meth = getattr(txn, 'tpc_begin', None)
if meth is not None: if meth is not None:
meth(when, args, client) meth(when, args, client)
def storea(self, when, args, client): def storea(self, when, args, client):
txn = self.__curtxn.get(client) txn = self.__curtxn.get(client)
if txn is None: if txn is None:
...@@ -221,7 +222,7 @@ class ZEOParser: ...@@ -221,7 +222,7 @@ class ZEOParser:
print '%s %s %4d %10d %s %s' % ( print '%s %s %4d %10d %s %s' % (
txn._begintime, txn._finishtime - txn._begintime, txn._begintime, txn._finishtime - txn._begintime,
len(txn._objects), len(txn._objects),
bytes, bytes,
time.ctime(txn._begintime), time.ctime(txn._begintime),
txn._url) txn._url)
...@@ -281,7 +282,7 @@ def main(): ...@@ -281,7 +282,7 @@ def main():
if replay: if replay:
storage = FileStorage(storagefile) storage = FileStorage(storagefile)
#storage = BDBFullStorage(storagefile) #storage = BDBFullStorage(storagefile)
#storage = PrimaryStorage('yyz', storage, RS_PORT) #storage = PrimaryStorage('yyz', storage, RS_PORT)
t0 = now() t0 = now()
p = ZEOParser(maxtxns, report, storage) p = ZEOParser(maxtxns, report, storage)
......
#!python
##############################################################################
#
# Copyright (c) 2003 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Tools for analyzing ZEO Server logs.
This script contains a number of commands, implemented by command
functions. To run a command, give the command name and it's arguments
as arguments to this script.
Commands:
blocked_times file threshold
Output a summary of episodes where thransactions were blocked
when the episode lasted at least threshold seconds.
The file may be a file name or - to read from standard input.
The file may also be a command:
script blocked_times 'bunzip2 <foo.log.bz2' 60
If the file is a command, it must contain at least a single
space.
The columns of output are:
- The time the episode started
- The seconds from the start of the episode until the blocking
transaction finished.
- The client id (host and port) of the blocking transaction.
- The seconds from the start of the episode until the end of the
episode.
time_calls file threshold
Time how long calls took. Note that this is normally combined
with grep to time just a particulat kind of call:
script time_calls 'bunzip2 <foo.log.bz2 | grep tpc_finish' 10
time_trans threshold
The columns of output are:
- The time of the call invocation
- The seconds from the call to the return
- The client that made the call.
time_trans file threshold
Output a summary of transactions that held the global transaction
lock for at least threshold seconds. (This is the time from when
voting starts until the transaction is completed by the server.)
The columns of output are:
- time that the vote started.
- client id
- number of objects written / number of objects updated
- seconds from tpc_begin to vote start
- seconds spent voting
- vote status: n=normal, d=delayed, e=error
- seconds wating between vote return and finish call
- time spent finishing or 'abort' if the transaction aborted
minute file
Compute production statistics by minute
The columns of output are:
- date/time
- Number of active clients
- number of reads
- number of stores
- number of commits (finish)
- number of aborts
- number of transactions (commits + aborts)
Summary statistics are printed at the end
minutes file
Show just the summary statistics for production by minute.
hour file
Compute production statistics by hour
hours file
Show just the summary statistics for production by hour.
day file
Compute production statistics by day
days file
Show just the summary statistics for production by day.
verify file
Compute verification statistics
The columns of output are:
- client id
- verification start time
- number of object's verified
- wall time to verify
- average miliseconds to verify per object.
$Id: zeoserverlog.py,v 1.2 2003/09/15 16:29:19 jeremy Exp $
"""
import datetime, sys, re, os
def time(line):
d = line[:10]
t = line[11:19]
y, mo, d = map(int, d.split('-'))
h, mi, s = map(int, t.split(':'))
return datetime.datetime(y, mo, d, h, mi, s)
def sub(t1, t2):
delta = t2 - t1
return delta.days*86400.0+delta.seconds+delta.microseconds/1000000.0
waitre = re.compile(r'Clients waiting: (\d+)')
idre = re.compile(r' ZSS:\d+/(\d+.\d+.\d+.\d+:\d+) ')
def blocked_times(args):
f, thresh = args
t1 = t2 = cid = blocking = waiting = 0
last_blocking = False
thresh = int(thresh)
for line in xopen(f):
line = line.strip()
if line.endswith('Blocked transaction restarted.'):
blocking = False
waiting = 0
else:
s = waitre.search(line)
if not s:
continue
waiting = int(s.group(1))
blocking = line.find(
'Transaction blocked waiting for storage') >= 0
if blocking and waiting == 1:
t1 = time(line)
t2 = t1
if not blocking and last_blocking:
last_wait = 0
t2 = time(line)
cid = idre.search(line).group(1)
if waiting == 0:
d = sub(t1, time(line))
if d >= thresh:
print t1, sub(t1, t2), cid, d
t1 = t2 = cid = blocking = waiting = last_wait = max_wait = 0
last_blocking = blocking
connidre = re.compile(r' zrpc-conn:(\d+.\d+.\d+.\d+:\d+) ')
def time_calls(f):
f, thresh = f
if f == '-':
f = sys.stdin
else:
f = xopen(f)
thresh = float(thresh)
t1 = None
maxd = 0
for line in f:
line = line.strip()
if ' calling ' in line:
t1 = time(line)
elif ' returns ' in line and t1 is not None:
d = sub(t1, time(line))
if d >= thresh:
print t1, d, connidre.search(line).group(1)
maxd = max(maxd, d)
t1 = None
print maxd
def xopen(f):
if f == '-':
return sys.stdin
if ' ' in f:
return os.popen(f, 'r')
return open(f)
def time_tpc(f):
f, thresh = f
if f == '-':
f = sys.stdin
else:
f = xopen(f)
thresh = float(thresh)
transactions = {}
for line in f:
line = line.strip()
if ' calling vote(' in line:
cid = connidre.search(line).group(1)
transactions[cid] = time(line),
elif ' vote returns None' in line:
cid = connidre.search(line).group(1)
transactions[cid] += time(line), 'n'
elif ' vote() raised' in line:
cid = connidre.search(line).group(1)
transactions[cid] += time(line), 'e'
elif ' vote returns ' in line:
# delayed, skip
cid = connidre.search(line).group(1)
transactions[cid] += time(line), 'd'
elif ' calling tpc_abort(' in line:
cid = connidre.search(line).group(1)
if cid in transactions:
t1, t2, vs = transactions[cid]
t = time(line)
d = sub(t1, t)
if d >= thresh:
print 'a', t1, cid, sub(t1, t2), vs, sub(t2, t)
del transactions[cid]
elif ' calling tpc_finish(' in line:
if cid in transactions:
cid = connidre.search(line).group(1)
transactions[cid] += time(line),
elif ' tpc_finish returns ' in line:
if cid in transactions:
t1, t2, vs, t3 = transactions[cid]
t = time(line)
d = sub(t1, t)
if d >= thresh:
print 'c', t1, cid, sub(t1, t2), vs, sub(t2, t3), sub(t3, t)
del transactions[cid]
newobre = re.compile(r"storea\(.*, '\\x00\\x00\\x00\\x00\\x00")
def time_trans(f):
f, thresh = f
if f == '-':
f = sys.stdin
else:
f = xopen(f)
thresh = float(thresh)
transactions = {}
for line in f:
line = line.strip()
if ' calling tpc_begin(' in line:
cid = connidre.search(line).group(1)
transactions[cid] = time(line), [0, 0]
if ' calling storea(' in line:
cid = connidre.search(line).group(1)
if cid in transactions:
transactions[cid][1][0] += 1
if not newobre.search(line):
transactions[cid][1][1] += 1
elif ' calling vote(' in line:
cid = connidre.search(line).group(1)
if cid in transactions:
transactions[cid] += time(line),
elif ' vote returns None' in line:
cid = connidre.search(line).group(1)
if cid in transactions:
transactions[cid] += time(line), 'n'
elif ' vote() raised' in line:
cid = connidre.search(line).group(1)
if cid in transactions:
transactions[cid] += time(line), 'e'
elif ' vote returns ' in line:
# delayed, skip
cid = connidre.search(line).group(1)
if cid in transactions:
transactions[cid] += time(line), 'd'
elif ' calling tpc_abort(' in line:
cid = connidre.search(line).group(1)
if cid in transactions:
try:
t0, (stores, old), t1, t2, vs = transactions[cid]
except ValueError:
pass
else:
t = time(line)
d = sub(t1, t)
if d >= thresh:
print t1, cid, "%s/%s" % (stores, old), \
sub(t0, t1), sub(t1, t2), vs, \
sub(t2, t), 'abort'
del transactions[cid]
elif ' calling tpc_finish(' in line:
if cid in transactions:
cid = connidre.search(line).group(1)
transactions[cid] += time(line),
elif ' tpc_finish returns ' in line:
if cid in transactions:
t0, (stores, old), t1, t2, vs, t3 = transactions[cid]
t = time(line)
d = sub(t1, t)
if d >= thresh:
print t1, cid, "%s/%s" % (stores, old), \
sub(t0, t1), sub(t1, t2), vs, \
sub(t2, t3), sub(t3, t)
del transactions[cid]
def minute(f, slice=16, detail=1, summary=1):
f, = f
if f == '-':
f = sys.stdin
else:
f = xopen(f)
mlast = r = s = c = a = cl = None
rs = []
ss = []
cs = []
as = []
ts = []
cls = []
for line in f:
line = line.strip()
if (line.find('returns') > 0
or line.find('storea') > 0
or line.find('tpc_abort') > 0
):
client = connidre.search(line).group(1)
m = line[:slice]
if m != mlast:
if mlast:
if detail:
print mlast, len(cl), r, s, c, a, a+c
cls.append(len(cl))
rs.append(r)
ss.append(s)
cs.append(c)
as.append(a)
ts.append(c+a)
mlast = m
r = s = c = a = 0
cl = {}
if line.find('zeoLoad') > 0:
r += 1
cl[client] = 1
elif line.find('storea') > 0:
s += 1
cl[client] = 1
elif line.find('tpc_finish') > 0:
c += 1
cl[client] = 1
elif line.find('tpc_abort') > 0:
a += 1
cl[client] = 1
if mlast:
if detail:
print mlast, len(cl), r, s, c, a, a+c
cls.append(len(cl))
rs.append(r)
ss.append(s)
cs.append(c)
as.append(a)
ts.append(c+a)
if summary:
print
print 'Summary: \t', '\t'.join(('min', '10%', '25%', 'med',
'75%', '90%', 'max', 'mean'))
print "n=%6d\t" % len(cls), '-'*62
print 'Clients: \t', '\t'.join(map(str,stats(cls)))
print 'Reads: \t', '\t'.join(map(str,stats( rs)))
print 'Stores: \t', '\t'.join(map(str,stats( ss)))
print 'Commits: \t', '\t'.join(map(str,stats( cs)))
print 'Aborts: \t', '\t'.join(map(str,stats( as)))
print 'Trans: \t', '\t'.join(map(str,stats( ts)))
def stats(s):
s.sort()
min = s[0]
max = s[-1]
n = len(s)
out = [min]
ni = n + 1
for p in .1, .25, .5, .75, .90:
lp = ni*p
l = int(lp)
if lp < 1 or lp > n:
out.append('-')
elif abs(lp-l) < .00001:
out.append(s[l-1])
else:
out.append(int(s[l-1] + (lp - l) * (s[l] - s[l-1])))
mean = 0.0
for v in s:
mean += v
out.extend([max, int(mean/n)])
return out
def minutes(f):
minute(f, 16, detail=0)
def hour(f):
minute(f, 13)
def day(f):
minute(f, 10)
def hours(f):
minute(f, 13, detail=0)
def days(f):
minute(f, 10, detail=0)
new_connection_idre = re.compile(r"new connection \('(\d+.\d+.\d+.\d+)', (\d+)\):")
def verify(f):
f, = f
if f == '-':
f = sys.stdin
else:
f = xopen(f)
t1 = None
nv = {}
for line in f:
if line.find('new connection') > 0:
m = new_connection_idre.search(line)
cid = "%s:%s" % (m.group(1), m.group(2))
nv[cid] = [time(line), 0]
elif line.find('calling zeoVerify(') > 0:
cid = connidre.search(line).group(1)
nv[cid][1] += 1
elif line.find('calling endZeoVerify()') > 0:
cid = connidre.search(line).group(1)
t1, n = nv[cid]
if n:
d = sub(t1, time(line))
print cid, t1, n, d, n and (d*1000.0/n) or '-'
def recovery(f):
f, = f
if f == '-':
f = sys.stdin
else:
f = xopen(f)
last = ''
trans = []
n = 0
for line in f:
n += 1
if line.find('RecoveryServer') < 0:
continue
l = line.find('sending transaction ')
if l > 0 and last.find('sending transaction ') > 0:
trans.append(line[l+20:].strip())
else:
if trans:
if len(trans) > 1:
print " ... %s similar records skipped ..." % (
len(trans) - 1)
print n, last.strip()
trans=[]
print n, line.strip()
last = line
if len(trans) > 1:
print " ... %s similar records skipped ..." % (
len(trans) - 1)
print n, last.strip()
if __name__ == '__main__':
globals()[sys.argv[1]](sys.argv[2:])
#! /usr/bin/env python #!python
"""Make sure a ZEO server is running. """Make sure a ZEO server is running.
Usage: zeoup.py [options] usage: zeoup.py [options]
The test will connect to a ZEO server, load the root object, and attempt to The test will connect to a ZEO server, load the root object, and attempt to
update the zeoup counter in the root. It will report success if it updates update the zeoup counter in the root. It will report success if it updates
...@@ -11,11 +11,11 @@ success, because the client was able to start a transaction. ...@@ -11,11 +11,11 @@ success, because the client was able to start a transaction.
Options: Options:
-p port -- port to connect to -p port -- port to connect to
-h host -- host to connect to (default is current host) -h host -- host to connect to (default is current host)
-S storage -- storage name (default '1') -S storage -- storage name (default '1')
-U path -- Unix-domain socket to connect to -U path -- Unix-domain socket to connect to
--nowrite -- Do not update the zeoup counter. --nowrite -- Do not update the zeoup counter.
......
#!python
##############################################################################
#
# Copyright (c) 2003 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Test script for testing ZODB under a heavy zope-like load.
Note that, to be as realistic as possible with ZEO, you should run this
script multiple times, to simulate multiple clients.
Here's how this works.
The script starts some number of threads. Each thread, sequentially
executes jobs. There is a job producer that produces jobs.
Input data are provided by a mail producer that hands out message from
a mailbox.
Execution continues until there is an error, which will normally occur
when the mailbox is exhausted.
Command-line options are used to provide job definitions. Job
definitions have perameters of the form name=value. Jobs have 2
standard parameters:
frequency=integer
The frequency of the job. The default is 1.
sleep=float
The number os seconds to sleep before performing the job. The
default is 0.
Usage: loadmail2 [options]
Options:
-edit [frequency=integer] [sleep=float]
Define an edit job. An edit job edits a random already-saved
email message, deleting and inserting a random number of words.
After editing the message, the message is (re)cataloged.
-insert [number=int] [frequency=integer] [sleep=float]
Insert some number of email messages.
-index [number=int] [frequency=integer] [sleep=float]
Insert and index (catalog) some number of email messages.
-search [terms='word1 word2 ...'] [frequency=integer] [sleep=float]
Search the catalog. A query is givem with one or more terms as
would be entered into a typical seach box. If no query is
given, then queries will be randomly selected based on a set of
built-in word list.
-setup
Set up the database. This will delete any existing Data.fs
file. (Of course, this may have no effect, if there is a
custom_zodb that defined a different storage.) It also adds a
mail folder and a catalog.
-options file
Read options from the given file. Th efile should be a python
source file that defines a sequence of options named 'options'.
-threads n
Specify the number of threads to execute. If not specified (< 2),
then jobs are run in a single (main) thread.
-mbox filename
Specify the mailbox for getting input data.
$Id: zodbload.py,v 1.2 2003/09/15 16:29:19 jeremy Exp $
"""
import mailbox
import math
import os
import random
import re
import sys
import threading
import time
class JobProducer:
def __init__(self):
self.jobs = []
def add(self, callable, frequency, sleep, repeatp=0):
self.jobs.extend([(callable, sleep, repeatp)] * int(frequency))
random.shuffle(self.jobs)
def next(self):
factory, sleep, repeatp = random.choice(self.jobs)
time.sleep(sleep)
callable, args = factory.create()
return factory, callable, args, repeatp
def __nonzero__(self):
return not not self.jobs
class MBox:
def __init__(self, filename):
if ' ' in filename:
filename, min, max = filename.split()
min = int(min)
max = int(max)
else:
min = max = 0
if filename.endswith('.bz2'):
f = os.popen("bunzip2 <"+filename, 'r')
filename = filename[-4:]
else:
f = open(filename)
self._mbox = mb = mailbox.UnixMailbox(f)
self.number = min
while min:
mb.next()
min -= 1
self._lock = threading.Lock()
self.__name__ = os.path.splitext(os.path.split(filename)[1])[0]
self._max = max
def next(self):
self._lock.acquire()
try:
if self._max > 0 and self.number >= self._max:
raise IndexError(self.number + 1)
message = self._mbox.next()
message.body = message.fp.read()
message.headers = list(message.headers)
self.number += 1
message.number = self.number
message.mbox = self.__name__
return message
finally:
self._lock.release()
bins = 9973
#bins = 11
def mailfolder(app, mboxname, number):
mail = getattr(app, mboxname, None)
if mail is None:
app.manage_addFolder(mboxname)
mail = getattr(app, mboxname)
from BTrees.Length import Length
mail.length = Length()
for i in range(bins):
mail.manage_addFolder('b'+str(i))
bin = hash(str(number))%bins
return getattr(mail, 'b'+str(bin))
def VmSize():
try:
f = open('/proc/%s/status' % os.getpid())
except:
return 0
else:
l = filter(lambda l: l[:7] == 'VmSize:', f.readlines())
if l:
l = l[0][7:].strip().split()[0]
return int(l)
return 0
def setup(lib_python):
try:
os.remove(os.path.join(lib_python, '..', '..', 'var', 'Data.fs'))
except:
pass
import Zope
import Products
import AccessControl.SecurityManagement
app=Zope.app()
Products.ZCatalog.ZCatalog.manage_addZCatalog(app, 'cat', '')
from Products.ZCTextIndex.ZCTextIndex import PLexicon
from Products.ZCTextIndex.Lexicon import Splitter, CaseNormalizer
app.cat._setObject('lex',
PLexicon('lex', '', Splitter(), CaseNormalizer())
)
class extra:
doc_attr = 'PrincipiaSearchSource'
lexicon_id = 'lex'
index_type = 'Okapi BM25 Rank'
app.cat.addIndex('PrincipiaSearchSource', 'ZCTextIndex', extra)
get_transaction().commit()
system = AccessControl.SpecialUsers.system
AccessControl.SecurityManagement.newSecurityManager(None, system)
app._p_jar.close()
def do(db, f, args):
"""Do something in a transaction, retrying of necessary
Measure the speed of both the compurartion and the commit
"""
from ZODB.POSException import ConflictError
wcomp = ccomp = wcommit = ccommit = 0.0
rconflicts = wconflicts = 0
start = time.time()
while 1:
connection = db.open()
try:
get_transaction().begin()
t=time.time()
c=time.clock()
try:
try:
r = f(connection, *args)
except ConflictError:
rconflicts += 1
get_transaction().abort()
continue
finally:
wcomp += time.time() - t
ccomp += time.clock() - c
t=time.time()
c=time.clock()
try:
try:
get_transaction().commit()
break
except ConflictError:
wconflicts += 1
get_transaction().abort()
continue
finally:
wcommit += time.time() - t
ccommit += time.clock() - c
finally:
connection.close()
return start, wcomp, ccomp, rconflicts, wconflicts, wcommit, ccommit, r
def run1(tid, db, factory, job, args):
(start, wcomp, ccomp, rconflicts, wconflicts, wcommit, ccommit, r
) = do(db, job, args)
start = "%.4d-%.2d-%.2d %.2d:%.2d:%.2d" % time.localtime(start)[:6]
print "%s %s %8.3g %8.3g %s %s\t%8.3g %8.3g %s %r" % (
start, tid, wcomp, ccomp, rconflicts, wconflicts, wcommit, ccommit,
factory.__name__, r)
def run(jobs, tid=''):
import Zope
while 1:
factory, job, args, repeatp = jobs.next()
run1(tid, Zope.DB, factory, job, args)
if repeatp:
while 1:
i = random.randint(0,100)
if i > repeatp:
break
run1(tid, Zope.DB, factory, job, args)
def index(connection, messages, catalog):
app = connection.root()['Application']
for message in messages:
mail = mailfolder(app, message.mbox, message.number)
docid = 'm'+str(message.number)
mail.manage_addDTMLDocument(docid, file=message.body)
# increment counted
getattr(app, message.mbox).length.change(1)
doc = mail[docid]
for h in message.headers:
h = h.strip()
l = h.find(':')
if l <= 0:
continue
name = h[:l].lower()
if name=='subject':
name='title'
v = h[l+1:].strip()
type='string'
if name=='title':
doc.manage_changeProperties(title=h)
else:
try:
doc.manage_addProperty(name, v, type)
except:
pass
if catalog:
app.cat.catalog_object(doc)
return message.number
class IndexJob:
needs_mbox = 1
catalog = 1
prefix = 'index'
def __init__(self, mbox, number=1):
self.__name__ = "%s%s_%s" % (self.prefix, number, mbox.__name__)
self.mbox, self.number = mbox, int(number)
def create(self):
messages = [self.mbox.next() for i in range(self.number)]
return index, (messages, self.catalog)
class InsertJob(IndexJob):
catalog = 0
prefix = 'insert'
wordre = re.compile(r'(\w{3,20})')
stop = 'and', 'not'
def edit(connection, mbox, catalog=1):
app = connection.root()['Application']
mail = getattr(app, mbox.__name__, None)
if mail is None:
time.sleep(1)
return "No mailbox %s" % mbox.__name__
nmessages = mail.length()
if nmessages < 2:
time.sleep(1)
return "No messages to edit in %s" % mbox.__name__
# find a message to edit:
while 1:
number = random.randint(1, nmessages-1)
did = 'm' + str(number)
mail = mailfolder(app, mbox.__name__, number)
doc = getattr(mail, did, None)
if doc is not None:
break
text = doc.raw.split()
norig = len(text)
if norig > 10:
ndel = int(math.exp(random.randint(0, int(math.log(norig)))))
nins = int(math.exp(random.randint(0, int(math.log(norig)))))
else:
ndel = 0
nins = 10
for j in range(ndel):
j = random.randint(0,len(text)-1)
word = text[j]
m = wordre.search(word)
if m:
word = m.group(1).lower()
if (not wordsd.has_key(word)) and word not in stop:
words.append(word)
wordsd[word] = 1
del text[j]
for j in range(nins):
word = random.choice(words)
text.append(word)
doc.raw = ' '.join(text)
if catalog:
app.cat.catalog_object(doc)
return norig, ndel, nins
class EditJob:
needs_mbox = 1
prefix = 'edit'
catalog = 1
def __init__(self, mbox):
self.__name__ = "%s_%s" % (self.prefix, mbox.__name__)
self.mbox = mbox
def create(self):
return edit, (self.mbox, self.catalog)
class ModifyJob(EditJob):
prefix = 'modify'
catalog = 0
def search(connection, terms, number):
app = connection.root()['Application']
cat = app.cat
n = 0
for i in number:
term = random.choice(terms)
results = cat(PrincipiaSearchSource=term)
n += len(results)
for result in results:
did = result.getObject().getId()
return n
class SearchJob:
def __init__(self, terms='', number=10):
if terms:
terms = terms.split()
self.__name__ = "search_" + '_'.join(terms)
self.terms = terms
else:
self.__name__ = 'search'
self.terms = words
number = min(int(number), len(self.terms))
self.number = range(number)
def create(self):
return search, (self.terms, self.number)
words=['banishment', 'indirectly', 'imprecise', 'peeks',
'opportunely', 'bribe', 'sufficiently', 'Occidentalized', 'elapsing',
'fermenting', 'listen', 'orphanage', 'younger', 'draperies', 'Ida',
'cuttlefish', 'mastermind', 'Michaels', 'populations', 'lent',
'cater', 'attentional', 'hastiness', 'dragnet', 'mangling',
'scabbards', 'princely', 'star', 'repeat', 'deviation', 'agers',
'fix', 'digital', 'ambitious', 'transit', 'jeeps', 'lighted',
'Prussianizations', 'Kickapoo', 'virtual', 'Andrew', 'generally',
'boatsman', 'amounts', 'promulgation', 'Malay', 'savaging',
'courtesan', 'nursed', 'hungered', 'shiningly', 'ship', 'presides',
'Parke', 'moderns', 'Jonas', 'unenlightening', 'dearth', 'deer',
'domesticates', 'recognize', 'gong', 'penetrating', 'dependents',
'unusually', 'complications', 'Dennis', 'imbalances', 'nightgown',
'attached', 'testaments', 'congresswoman', 'circuits', 'bumpers',
'braver', 'Boreas', 'hauled', 'Howe', 'seethed', 'cult', 'numismatic',
'vitality', 'differences', 'collapsed', 'Sandburg', 'inches', 'head',
'rhythmic', 'opponent', 'blanketer', 'attorneys', 'hen', 'spies',
'indispensably', 'clinical', 'redirection', 'submit', 'catalysts',
'councilwoman', 'kills', 'topologies', 'noxious', 'exactions',
'dashers', 'balanced', 'slider', 'cancerous', 'bathtubs', 'legged',
'respectably', 'crochets', 'absenteeism', 'arcsine', 'facility',
'cleaners', 'bobwhite', 'Hawkins', 'stockade', 'provisional',
'tenants', 'forearms', 'Knowlton', 'commit', 'scornful',
'pediatrician', 'greets', 'clenches', 'trowels', 'accepts',
'Carboloy', 'Glenn', 'Leigh', 'enroll', 'Madison', 'Macon', 'oiling',
'entertainingly', 'super', 'propositional', 'pliers', 'beneficiary',
'hospitable', 'emigration', 'sift', 'sensor', 'reserved',
'colonization', 'shrilled', 'momentously', 'stevedore', 'Shanghaiing',
'schoolmasters', 'shaken', 'biology', 'inclination', 'immoderate',
'stem', 'allegory', 'economical', 'daytime', 'Newell', 'Moscow',
'archeology', 'ported', 'scandals', 'Blackfoot', 'leery', 'kilobit',
'empire', 'obliviousness', 'productions', 'sacrificed', 'ideals',
'enrolling', 'certainties', 'Capsicum', 'Brookdale', 'Markism',
'unkind', 'dyers', 'legislates', 'grotesquely', 'megawords',
'arbitrary', 'laughing', 'wildcats', 'thrower', 'sex', 'devils',
'Wehr', 'ablates', 'consume', 'gossips', 'doorways', 'Shari',
'advanced', 'enumerable', 'existentially', 'stunt', 'auctioneers',
'scheduler', 'blanching', 'petulance', 'perceptibly', 'vapors',
'progressed', 'rains', 'intercom', 'emergency', 'increased',
'fluctuating', 'Krishna', 'silken', 'reformed', 'transformation',
'easter', 'fares', 'comprehensible', 'trespasses', 'hallmark',
'tormenter', 'breastworks', 'brassiere', 'bladders', 'civet', 'death',
'transformer', 'tolerably', 'bugle', 'clergy', 'mantels', 'satin',
'Boswellizes', 'Bloomington', 'notifier', 'Filippo', 'circling',
'unassigned', 'dumbness', 'sentries', 'representativeness', 'souped',
'Klux', 'Kingstown', 'gerund', 'Russell', 'splices', 'bellow',
'bandies', 'beefers', 'cameramen', 'appalled', 'Ionian', 'butterball',
'Portland', 'pleaded', 'admiringly', 'pricks', 'hearty', 'corer',
'deliverable', 'accountably', 'mentors', 'accorded',
'acknowledgement', 'Lawrenceville', 'morphology', 'eucalyptus',
'Rena', 'enchanting', 'tighter', 'scholars', 'graduations', 'edges',
'Latinization', 'proficiency', 'monolithic', 'parenthesizing', 'defy',
'shames', 'enjoyment', 'Purdue', 'disagrees', 'barefoot', 'maims',
'flabbergast', 'dishonorable', 'interpolation', 'fanatics', 'dickens',
'abysses', 'adverse', 'components', 'bowl', 'belong', 'Pipestone',
'trainees', 'paw', 'pigtail', 'feed', 'whore', 'conditioner',
'Volstead', 'voices', 'strain', 'inhabits', 'Edwin', 'discourses',
'deigns', 'cruiser', 'biconvex', 'biking', 'depreciation', 'Harrison',
'Persian', 'stunning', 'agar', 'rope', 'wagoner', 'elections',
'reticulately', 'Cruz', 'pulpits', 'wilt', 'peels', 'plants',
'administerings', 'deepen', 'rubs', 'hence', 'dissension', 'implored',
'bereavement', 'abyss', 'Pennsylvania', 'benevolent', 'corresponding',
'Poseidon', 'inactive', 'butchers', 'Mach', 'woke', 'loading',
'utilizing', 'Hoosier', 'undo', 'Semitization', 'trigger', 'Mouthe',
'mark', 'disgracefully', 'copier', 'futility', 'gondola', 'algebraic',
'lecturers', 'sponged', 'instigators', 'looted', 'ether', 'trust',
'feeblest', 'sequencer', 'disjointness', 'congresses', 'Vicksburg',
'incompatibilities', 'commend', 'Luxembourg', 'reticulation',
'instructively', 'reconstructs', 'bricks', 'attache', 'Englishman',
'provocation', 'roughen', 'cynic', 'plugged', 'scrawls', 'antipode',
'injected', 'Daedalus', 'Burnsides', 'asker', 'confronter',
'merriment', 'disdain', 'thicket', 'stinker', 'great', 'tiers',
'oust', 'antipodes', 'Macintosh', 'tented', 'packages',
'Mediterraneanize', 'hurts', 'orthodontist', 'seeder', 'readying',
'babying', 'Florida', 'Sri', 'buckets', 'complementary',
'cartographer', 'chateaus', 'shaves', 'thinkable', 'Tehran',
'Gordian', 'Angles', 'arguable', 'bureau', 'smallest', 'fans',
'navigated', 'dipole', 'bootleg', 'distinctive', 'minimization',
'absorbed', 'surmised', 'Malawi', 'absorbent', 'close', 'conciseness',
'hopefully', 'declares', 'descent', 'trick', 'portend', 'unable',
'mildly', 'Morse', 'reference', 'scours', 'Caribbean', 'battlers',
'astringency', 'likelier', 'Byronizes', 'econometric', 'grad',
'steak', 'Austrian', 'ban', 'voting', 'Darlington', 'bison', 'Cetus',
'proclaim', 'Gilbertson', 'evictions', 'submittal', 'bearings',
'Gothicizer', 'settings', 'McMahon', 'densities', 'determinants',
'period', 'DeKastere', 'swindle', 'promptness', 'enablers', 'wordy',
'during', 'tables', 'responder', 'baffle', 'phosgene', 'muttering',
'limiters', 'custodian', 'prevented', 'Stouffer', 'waltz', 'Videotex',
'brainstorms', 'alcoholism', 'jab', 'shouldering', 'screening',
'explicitly', 'earner', 'commandment', 'French', 'scrutinizing',
'Gemma', 'capacitive', 'sheriff', 'herbivore', 'Betsey', 'Formosa',
'scorcher', 'font', 'damming', 'soldiers', 'flack', 'Marks',
'unlinking', 'serenely', 'rotating', 'converge', 'celebrities',
'unassailable', 'bawling', 'wording', 'silencing', 'scotch',
'coincided', 'masochists', 'graphs', 'pernicious', 'disease',
'depreciates', 'later', 'torus', 'interject', 'mutated', 'causer',
'messy', 'Bechtel', 'redundantly', 'profoundest', 'autopsy',
'philosophic', 'iterate', 'Poisson', 'horridly', 'silversmith',
'millennium', 'plunder', 'salmon', 'missioner', 'advances', 'provers',
'earthliness', 'manor', 'resurrectors', 'Dahl', 'canto', 'gangrene',
'gabler', 'ashore', 'frictionless', 'expansionism', 'emphasis',
'preservations', 'Duane', 'descend', 'isolated', 'firmware',
'dynamites', 'scrawled', 'cavemen', 'ponder', 'prosperity', 'squaw',
'vulnerable', 'opthalmic', 'Simms', 'unite', 'totallers', 'Waring',
'enforced', 'bridge', 'collecting', 'sublime', 'Moore', 'gobble',
'criticizes', 'daydreams', 'sedate', 'apples', 'Concordia',
'subsequence', 'distill', 'Allan', 'seizure', 'Isadore', 'Lancashire',
'spacings', 'corresponded', 'hobble', 'Boonton', 'genuineness',
'artifact', 'gratuities', 'interviewee', 'Vladimir', 'mailable',
'Bini', 'Kowalewski', 'interprets', 'bereave', 'evacuated', 'friend',
'tourists', 'crunched', 'soothsayer', 'fleetly', 'Romanizations',
'Medicaid', 'persevering', 'flimsy', 'doomsday', 'trillion',
'carcasses', 'guess', 'seersucker', 'ripping', 'affliction',
'wildest', 'spokes', 'sheaths', 'procreate', 'rusticates', 'Schapiro',
'thereafter', 'mistakenly', 'shelf', 'ruination', 'bushel',
'assuredly', 'corrupting', 'federation', 'portmanteau', 'wading',
'incendiary', 'thing', 'wanderers', 'messages', 'Paso', 'reexamined',
'freeings', 'denture', 'potting', 'disturber', 'laborer', 'comrade',
'intercommunicating', 'Pelham', 'reproach', 'Fenton', 'Alva', 'oasis',
'attending', 'cockpit', 'scout', 'Jude', 'gagging', 'jailed',
'crustaceans', 'dirt', 'exquisitely', 'Internet', 'blocker', 'smock',
'Troutman', 'neighboring', 'surprise', 'midscale', 'impart',
'badgering', 'fountain', 'Essen', 'societies', 'redresses',
'afterwards', 'puckering', 'silks', 'Blakey', 'sequel', 'greet',
'basements', 'Aubrey', 'helmsman', 'album', 'wheelers', 'easternmost',
'flock', 'ambassadors', 'astatine', 'supplant', 'gird', 'clockwork',
'foxes', 'rerouting', 'divisional', 'bends', 'spacer',
'physiologically', 'exquisite', 'concerts', 'unbridled', 'crossing',
'rock', 'leatherneck', 'Fortescue', 'reloading', 'Laramie', 'Tim',
'forlorn', 'revert', 'scarcer', 'spigot', 'equality', 'paranormal',
'aggrieves', 'pegs', 'committeewomen', 'documented', 'interrupt',
'emerald', 'Battelle', 'reconverted', 'anticipated', 'prejudices',
'drowsiness', 'trivialities', 'food', 'blackberries', 'Cyclades',
'tourist', 'branching', 'nugget', 'Asilomar', 'repairmen', 'Cowan',
'receptacles', 'nobler', 'Nebraskan', 'territorial', 'chickadee',
'bedbug', 'darted', 'vigilance', 'Octavia', 'summands', 'policemen',
'twirls', 'style', 'outlawing', 'specifiable', 'pang', 'Orpheus',
'epigram', 'Babel', 'butyrate', 'wishing', 'fiendish', 'accentuate',
'much', 'pulsed', 'adorned', 'arbiters', 'counted', 'Afrikaner',
'parameterizes', 'agenda', 'Americanism', 'referenda', 'derived',
'liquidity', 'trembling', 'lordly', 'Agway', 'Dillon', 'propellers',
'statement', 'stickiest', 'thankfully', 'autograph', 'parallel',
'impulse', 'Hamey', 'stylistic', 'disproved', 'inquirer', 'hoisting',
'residues', 'variant', 'colonials', 'dequeued', 'especial', 'Samoa',
'Polaris', 'dismisses', 'surpasses', 'prognosis', 'urinates',
'leaguers', 'ostriches', 'calculative', 'digested', 'divided',
'reconfigurer', 'Lakewood', 'illegalities', 'redundancy',
'approachability', 'masterly', 'cookery', 'crystallized', 'Dunham',
'exclaims', 'mainline', 'Australianizes', 'nationhood', 'pusher',
'ushers', 'paranoia', 'workstations', 'radiance', 'impedes',
'Minotaur', 'cataloging', 'bites', 'fashioning', 'Alsop', 'servants',
'Onondaga', 'paragraph', 'leadings', 'clients', 'Latrobe',
'Cornwallis', 'excitingly', 'calorimetric', 'savior', 'tandem',
'antibiotics', 'excuse', 'brushy', 'selfish', 'naive', 'becomes',
'towers', 'popularizes', 'engender', 'introducing', 'possession',
'slaughtered', 'marginally', 'Packards', 'parabola', 'utopia',
'automata', 'deterrent', 'chocolates', 'objectives', 'clannish',
'aspirin', 'ferociousness', 'primarily', 'armpit', 'handfuls',
'dangle', 'Manila', 'enlivened', 'decrease', 'phylum', 'hardy',
'objectively', 'baskets', 'chaired', 'Sepoy', 'deputy', 'blizzard',
'shootings', 'breathtaking', 'sticking', 'initials', 'epitomized',
'Forrest', 'cellular', 'amatory', 'radioed', 'horrified', 'Neva',
'simultaneous', 'delimiter', 'expulsion', 'Himmler', 'contradiction',
'Remus', 'Franklinizations', 'luggage', 'moisture', 'Jews',
'comptroller', 'brevity', 'contradictions', 'Ohio', 'active',
'babysit', 'China', 'youngest', 'superstition', 'clawing', 'raccoons',
'chose', 'shoreline', 'helmets', 'Jeffersonian', 'papered',
'kindergarten', 'reply', 'succinct', 'split', 'wriggle', 'suitcases',
'nonce', 'grinders', 'anthem', 'showcase', 'maimed', 'blue', 'obeys',
'unreported', 'perusing', 'recalculate', 'rancher', 'demonic',
'Lilliputianize', 'approximation', 'repents', 'yellowness',
'irritates', 'Ferber', 'flashlights', 'booty', 'Neanderthal',
'someday', 'foregoes', 'lingering', 'cloudiness', 'guy', 'consumer',
'Berkowitz', 'relics', 'interpolating', 'reappearing', 'advisements',
'Nolan', 'turrets', 'skeletal', 'skills', 'mammas', 'Winsett',
'wheelings', 'stiffen', 'monkeys', 'plainness', 'braziers', 'Leary',
'advisee', 'jack', 'verb', 'reinterpret', 'geometrical', 'trolleys',
'arboreal', 'overpowered', 'Cuzco', 'poetical', 'admirations',
'Hobbes', 'phonemes', 'Newsweek', 'agitator', 'finally', 'prophets',
'environment', 'easterners', 'precomputed', 'faults', 'rankly',
'swallowing', 'crawl', 'trolley', 'spreading', 'resourceful', 'go',
'demandingly', 'broader', 'spiders', 'Marsha', 'debris', 'operates',
'Dundee', 'alleles', 'crunchier', 'quizzical', 'hanging', 'Fisk']
wordsd = {}
for word in words:
wordsd[word] = 1
def collect_options(args, jobs, options):
while args:
arg = args.pop(0)
if arg.startswith('-'):
name = arg[1:]
if name == 'options':
fname = args.pop(0)
d = {}
execfile(fname, d)
collect_options(list(d['options']), jobs, options)
elif options.has_key(name):
v = args.pop(0)
if options[name] != None:
raise ValueError(
"Duplicate values for %s, %s and %s"
% (name, v, options[name])
)
options[name] = v
elif name == 'setup':
options['setup'] = 1
elif globals().has_key(name.capitalize()+'Job'):
job = name
kw = {}
while args and args[0].find("=") > 0:
arg = args.pop(0).split('=')
name, v = arg[0], '='.join(arg[1:])
if kw.has_key(name):
raise ValueError(
"Duplicate parameter %s for job %s"
% (name, job)
)
kw[name]=v
if kw.has_key('frequency'):
frequency = kw['frequency']
del kw['frequency']
else:
frequency = 1
if kw.has_key('sleep'):
sleep = float(kw['sleep'])
del kw['sleep']
else:
sleep = 0.0001
if kw.has_key('repeat'):
repeatp = float(kw['repeat'])
del kw['repeat']
else:
repeatp = 0
jobs.append((job, kw, frequency, sleep, repeatp))
else:
raise ValueError("not an option or job", name)
else:
raise ValueError("Expected an option", arg)
def find_lib_python():
for b in os.getcwd(), os.path.split(sys.argv[0])[0]:
for i in range(6):
d = ['..']*i + ['lib', 'python']
p = os.path.join(b, *d)
if os.path.isdir(p):
return p
raise ValueError("Couldn't find lib/python")
def main(args=None):
lib_python = find_lib_python()
sys.path.insert(0, lib_python)
if args is None:
args = sys.argv[1:]
if not args:
print __doc__
sys.exit(0)
print args
random.seed(hash(tuple(args))) # always use the same for the given args
options = {"mbox": None, "threads": None}
jobdefs = []
collect_options(args, jobdefs, options)
mboxes = {}
if options["mbox"]:
mboxes[options["mbox"]] = MBox(options["mbox"])
if options.has_key('setup'):
setup(lib_python)
else:
import Zope
Zope.startup()
#from ThreadedAsync.LoopCallback import loop
#threading.Thread(target=loop, args=(), name='asyncore').start()
jobs = JobProducer()
for job, kw, frequency, sleep, repeatp in jobdefs:
Job = globals()[job.capitalize()+'Job']
if getattr(Job, 'needs_mbox', 0):
if not kw.has_key("mbox"):
if not options["mbox"]:
raise ValueError(
"no mailbox (mbox option) file specified")
kw['mbox'] = mboxes[options["mbox"]]
else:
if not mboxes.has_key[kw["mbox"]]:
mboxes[kw['mbox']] = MBox[kw['mbox']]
kw["mbox"] = mboxes[kw['mbox']]
jobs.add(Job(**kw), frequency, sleep, repeatp)
if not jobs:
print "No jobs to execute"
return
threads = int(options['threads'] or '0')
if threads > 1:
threads = [threading.Thread(target=run, args=(jobs, i), name=str(i))
for i in range(threads)]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
else:
run(jobs)
if __name__ == '__main__':
main()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment