Commit 81246863 authored by Jeremy Hylton's avatar Jeremy Hylton

Merge changes from ZODB3-3_2-branch to Zope-2_7-branch.

Please make all future changes on the Zope-2_7-branch instead.
parent 0f6d89db
......@@ -21,7 +21,7 @@ static char ExtensionClass_module_documentation[] =
" - They provide access to unbound methods,\n"
" - They can be called to create instances.\n"
"\n"
"$Id: ExtensionClass.c,v 1.60 2003/08/15 17:27:58 tim_one Exp $\n"
"$Id: ExtensionClass.c,v 1.61 2003/09/15 16:29:20 jeremy Exp $\n"
;
#include "ExtensionClass.h"
......
......@@ -14,7 +14,7 @@
"""Berkeley storage with full undo and versioning support.
$Revision: 1.72 $
$Revision: 1.73 $
"""
import time
......@@ -235,7 +235,7 @@ class BDBFullStorage(BerkeleyBase, ConflictResolvingStorage):
# Tables to support packing.
self._objrevs = self._setupDB('objrevs', db.DB_DUP)
self._packmark = self._setupDB('packmark')
self._oidqueue = self._setupDB('oidqueue', 0, db.DB_QUEUE, 8)
self._oidqueue = self._setupDB('oidqueue', 0, db.DB_QUEUE, 16)
self._delqueue = self._setupDB('delqueue', 0, db.DB_QUEUE, 8)
# Do recovery and consistency checks
self._withlock(self._dorecovery)
......@@ -247,14 +247,6 @@ class BDBFullStorage(BerkeleyBase, ConflictResolvingStorage):
elif version <> BDBFULL_SCHEMA_VERSION:
raise POSException.StorageSystemError, 'incompatible storage version'
def _make_autopacker(self, event):
config = self._config
lastpacktime = U64(self._last_packtime())
return _Autopack(
self, event,
config.frequency, config.packtime, config.classicpack,
lastpacktime)
def _dorecovery(self):
# If these tables are non-empty, it means we crashed during a pack
# operation. I think we can safely throw out this data since the next
......@@ -282,8 +274,10 @@ class BDBFullStorage(BerkeleyBase, ConflictResolvingStorage):
flag = self._pending.get(tid)
assert flag in (ABORT, COMMIT)
if flag == ABORT:
self.log('aborting pending transaction %r', tid)
self._withtxn(self._doabort, tid)
else:
self.log('recovering pending transaction %r', tid)
self._withtxn(self._docommit, tid)
# Initialize our cache of the next available version id.
c = self._versions.cursor()
......@@ -295,9 +289,9 @@ class BDBFullStorage(BerkeleyBase, ConflictResolvingStorage):
# Convert to a Python long integer. Note that cursor.last()
# returns key/value, and we want the key (which for the
# versions table is the vid).
self.__nextvid = U64(rec[0])
self._nextvid = U64(rec[0])
else:
self.__nextvid = 0L
self._nextvid = 0L
# Initialize the last transaction
c = self._txnoids.cursor()
try:
......@@ -305,13 +299,18 @@ class BDBFullStorage(BerkeleyBase, ConflictResolvingStorage):
finally:
c.close()
if rec:
self.__ltid = rec[0]
self._ltid = rec[0]
else:
self.__ltid = ZERO
self._ltid = ZERO
def _make_autopacker(self, event):
config = self._config
return _Autopack(self, event,
config.frequency, config.packtime, config.gcpack)
def _doabort(self, txn, tid):
# First clean up the oid indexed (or oid+tid indexed) tables.
co = cs = ct = cv = None
co = cs = ct = cv = cr = None
try:
co = self._oids.cursor(txn=txn)
cs = self._serials.cursor(txn=txn)
......@@ -338,6 +337,9 @@ class BDBFullStorage(BerkeleyBase, ConflictResolvingStorage):
revid = oid+tid
vid = self._metadata[revid][:8]
self._metadata.delete(revid, txn=txn)
# If the transaction performed an operation that did not
# write a new pickle, e.g. a version operation or undo,
# there will be metadata but no pickle.
try:
self._pickles.delete(revid, txn=txn)
except db.DBNotFoundError:
......@@ -442,7 +444,7 @@ class BDBFullStorage(BerkeleyBase, ConflictResolvingStorage):
# if co.close() were to fail. In practice this shouldn't happen.
if co: co.close()
if cs: cs.close()
# Now incref all the object refcounts
# Now incref all references
for oid, delta in deltas.items():
refcount = self._refcounts.get(oid, ZERO, txn=txn)
self._refcounts.put(oid, incr(refcount, delta), txn=txn)
......@@ -450,21 +452,9 @@ class BDBFullStorage(BerkeleyBase, ConflictResolvingStorage):
self._pvids.truncate(txn)
self._prevrevids.truncate(txn)
self._pending.truncate(txn)
# If we're in the middle of a pack, we need to add to the packmark
# table any objects that were modified in this transaction.
# Otherwise, there's a race condition where mark might have happened,
# then the object is added, then sweep runs, deleting the object
# created in the interrim.
if self._packing:
for oid in self._oids.keys():
self._packmark.put(oid, PRESENT, txn=txn)
self._oids.truncate(txn)
def _dobegin(self, txn, tid, u, d, e):
# When a transaction begins, we set the pending flag to ABORT,
# meaning, if we crash between now and the time we vote, all changes
# will be aborted.
#
# It's more convenient to store the transaction metadata now, rather
# than in the _finish() call. Doesn't matter because if the ZODB
# transaction were to abort, we'd clean this up anyway.
......@@ -472,6 +462,9 @@ class BDBFullStorage(BerkeleyBase, ConflictResolvingStorage):
desclen = len(d)
lengths = pack('>II', userlen, desclen)
data = lengths + u + d + e
# When a transaction begins, we set the pending flag to ABORT,
# meaning, if we crash between now and the time we vote, all changes
# will be aborted.
self._pending.put(tid, ABORT, txn=txn)
self._txnMetadata.put(tid, data, txn=txn)
......@@ -480,7 +473,7 @@ class BDBFullStorage(BerkeleyBase, ConflictResolvingStorage):
def _finish(self, tid, u, d, e):
self._withtxn(self._docommit, self._serial)
self.__ltid = tid
self._ltid = tid
#
# Storing an object revision in a transaction
......@@ -539,7 +532,7 @@ class BDBFullStorage(BerkeleyBase, ConflictResolvingStorage):
# The non-version revid is the same as for the previous
# revision of the object.
nvrevid = onvrevid
# Now store optimistically data to all the tables
# Now optimistically store data to all the tables
newserial = self._serial
revid = oid + newserial
self._serials.put(oid, newserial, txn=txn)
......@@ -562,6 +555,8 @@ class BDBFullStorage(BerkeleyBase, ConflictResolvingStorage):
def store(self, oid, serial, data, version, transaction):
# Lock and transaction wrapper
if self._is_read_only:
raise POSException.ReadOnlyError()
if transaction is not self._transaction:
raise POSException.StorageTransactionError(self, transaction)
self._lock_acquire()
......@@ -683,9 +678,9 @@ class BDBFullStorage(BerkeleyBase, ConflictResolvingStorage):
# progress gets aborted.
vid = self._vids.get(version)
if vid is None:
self.__nextvid += 1
self._nextvid += 1
# Convert the version id into an 8-byte string
vid = p64(self.__nextvid)
vid = p64(self._nextvid)
# Now update the vids/versions tables, along with the log table
self._vids.put(version, vid, txn=txn)
self._versions.put(vid, version, txn=txn)
......@@ -718,7 +713,7 @@ class BDBFullStorage(BerkeleyBase, ConflictResolvingStorage):
rec = c.next()
continue
# This object was modified
rtnoids[oid] = 1
rtnoids[oid] = True
# Calculate the values for the new transaction metadata
serial, tid = self._getSerialAndTid(oid)
meta = self._metadata[oid+tid]
......@@ -792,7 +787,7 @@ class BDBFullStorage(BerkeleyBase, ConflictResolvingStorage):
if not dest:
dvid = ZERO
else:
# Find the vid for the dest version, or create on eif necessary.
# Find the vid for the dest version, or create one if necessary.
dvid = self._findcreatevid(dest, txn)
c = self._currentVersions.cursor(txn)
try:
......@@ -812,14 +807,14 @@ class BDBFullStorage(BerkeleyBase, ConflictResolvingStorage):
rec = c.next()
continue
# This object was modified
rtnoids[oid] = 1
rtnoids[oid] = True
# Calculate the values for the new transaction metadata
serial, tid = self._getSerialAndTid(oid)
meta = self._metadata[oid+tid]
curvid, nvrevid, lrevid = unpack('>8s8s8s', meta[:24])
assert curvid == svid
# If we're committing to the non-version, then the nvrevid
# ougt to be ZERO too, regardless of what it was for the
# ought to be ZERO too, regardless of what it was for the
# source version.
if not dest:
nvrevid = ZERO
......@@ -893,10 +888,7 @@ class BDBFullStorage(BerkeleyBase, ConflictResolvingStorage):
vid = self._vids.get(version, missing)
if vid is missing:
return True
if self._currentVersions.has_key(vid):
return False
else:
return True
return not self._currentVersions.has_key(vid)
finally:
self._lock_release()
......@@ -949,7 +941,8 @@ class BDBFullStorage(BerkeleyBase, ConflictResolvingStorage):
if vid == ZERO or self._versions.get(vid) == version:
return self._pickles[oid+lrevid], serial
# The object was living in a version, but not the one requested.
# Semantics here are to return the non-version revision.
# Semantics here are to return the non-version revision. Allow
# KeyErrors to percolate up (meaning there's no non-version rev).
lrevid = self._metadata[oid+nvrevid][16:24]
return self._pickles[oid+lrevid], nvrevid
finally:
......@@ -1066,13 +1059,16 @@ class BDBFullStorage(BerkeleyBase, ConflictResolvingStorage):
def lastTransaction(self):
"""Return transaction id for last committed transaction"""
return self.__ltid
return self._ltid
#
# Transactional undo
#
def _undo_current_tid(self, oid, ctid):
# Returns (oid, metadata record, None). The last represents the data
# which will always be None because there's no conflict resolution
# necessary.
vid, nvrevid, lrevid, prevrevid = unpack(
'>8s8s8s8s', self._metadata[oid+ctid])
# We can always undo the last transaction. The prevrevid pointer
......@@ -1092,9 +1088,8 @@ class BDBFullStorage(BerkeleyBase, ConflictResolvingStorage):
return oid, vid+nvrevid+DNE+ctid, None
# BAW: If the serial number of this object record is the same as
# the serial we're being asked to undo, then I think we have a
# problem (since the storage invariant is that it doesn't retain
# metadata records for multiple modifications of the object in the
# same transaction).
# problem (since the storage invariant is that it retains only one
# metadata record per object revision).
assert mrec[0][8:] <> ctid, 'storage invariant violated'
# All is good, so just restore this metadata record
return oid, mrec[1], None
......@@ -1102,6 +1097,9 @@ class BDBFullStorage(BerkeleyBase, ConflictResolvingStorage):
mdc.close()
def _undo_to_same_pickle(self, oid, tid, ctid):
# Returns (oid, metadata record, data). Data always be None unless
# conflict resolution was necessary and succeeded.
#
# We need to compare the lrevid (pickle pointers) of the transaction
# previous to the current one, and the transaction previous to the one
# we want to undo. If their lrevids are the same, it's undoable
......@@ -1115,9 +1113,9 @@ class BDBFullStorage(BerkeleyBase, ConflictResolvingStorage):
vid, nvrevid = unpack('>8s8s', self._metadata[oid+tid][:16])
return oid, vid+nvrevid+DNE+ctid, None
elif target_prevrevid == ZERO or last_prevrevid == ZERO:
# The object's revision is in it's initial creation state but
# we're asking for an undo of something other than the initial
# creation state. No, no.
# The object's revision is in its initial creation state but we're
# asking for an undo of something other than the initial creation
# state. No, no.
raise POSException.UndoError(
'Undoing mismatched zombification', oid)
last_lrevid = self._metadata[oid+last_prevrevid][16:24]
......@@ -1125,20 +1123,25 @@ class BDBFullStorage(BerkeleyBase, ConflictResolvingStorage):
target_lrevid = target_metadata[16:24]
# If the pickle pointers of the object's last revision and the
# undo-target revision are the same, then the transaction can be
# undone. Note that we take a short cut here, since we really want to
# test pickle equality, but this is good enough for now.
# undone. Note that we cannot test for pickle equality here because
# that would allow us to undo to an arbitrary object history. Imagine
# a boolean object -- if undo tested for equality and not identity,
# then half the time we could undo to an arbitrary point in the
# object's history.
if target_lrevid == last_lrevid:
return oid, target_metadata, None
# Check previous transactionalUndos done in this transaction
# Check previous transactional undos done in this transaction
elif target_lrevid == self._prevrevids.get(oid):
return oid, target_metadata, None
else:
# Attempt application level conflict resolution
try:
data = self.tryToResolveConflict(
oid, ctid, tid, self._pickles[oid+target_lrevid])
except ConflictError:
raise POSException.UndoError, 'Cannot undo transaction'
if data:
return oid, target_metadata, data
else:
raise POSException.UndoError('Cannot undo transaction', oid)
def _dotxnundo(self, txn, tid):
......@@ -1184,10 +1187,10 @@ class BDBFullStorage(BerkeleyBase, ConflictResolvingStorage):
self._pickles.put(revid, data, txn=txn)
metadata = vid+nvrevid+newserial+prevrevid
# We need to write all the new records for an object changing in
# this transaction. Note that we only write to th serials table
# this transaction. Note that we only write to the serials table
# if prevrevids hasn't already seen this object, otherwise we'll
# end up with multiple entries in the serials table for the same
# tid.
# object revision.
if not self._prevrevids.has_key(oid):
self._serials.put(oid, newserial, txn=txn)
self._metadata.put(revid, metadata, txn=txn)
......@@ -1198,7 +1201,7 @@ class BDBFullStorage(BerkeleyBase, ConflictResolvingStorage):
if vid <> ZERO:
self._currentVersions.put(vid, revid, txn=txn)
self._oids.put(oid, PRESENT, txn=txn)
rtnoids[oid] = 1
rtnoids[oid] = True
# Add this object revision to the autopack table
self._objrevs.put(newserial+oid, prevrevid, txn=txn)
return rtnoids.keys()
......@@ -1212,6 +1215,21 @@ class BDBFullStorage(BerkeleyBase, ConflictResolvingStorage):
finally:
self._lock_release()
def _unpack_txnmeta(self, txnmeta):
userlen, desclen = unpack('>2I', txnmeta[:8])
user = txnmeta[8:8+userlen]
desc = txnmeta[8+userlen:8+userlen+desclen]
extdata = txnmeta[8+userlen+desclen:]
# ext is a pickled mapping. Any exceptions are ignored, but XXX can
# we (and FileStorage :) do better?
ext = {}
if extdata:
try:
ext = pickle.loads(extdata)
except Exception, e:
self.log('Error unpickling extension data: %s', e)
return user, desc, ext
def _doundolog(self, first, last, filter):
# Get the last packtime
packtime = self._last_packtime()
......@@ -1229,26 +1247,14 @@ class BDBFullStorage(BerkeleyBase, ConflictResolvingStorage):
rec = c.prev()
if tid <= packtime:
break
userlen, desclen = unpack('>II', txnmeta[:8])
user = txnmeta[8:8+userlen]
desc = txnmeta[8+userlen:8+userlen+desclen]
ext = txnmeta[8+userlen+desclen:]
user, desc, ext = self._unpack_txnmeta(txnmeta)
# Create a dictionary for the TransactionDescription
txndesc = {'id' : tid,
'time' : TimeStamp(tid).timeTime(),
'user_name' : user,
'description': desc,
}
# The extension stuff is a picklable mapping, so if we can
# unpickle it, we update the TransactionDescription dictionary
# with that data. BAW: The bare except is disgusting, but I'm
# too lazy to figure out what exceptions could actually be
# raised here...
if ext:
try:
txndesc.update(pickle.loads(ext))
except:
pass
txndesc.update(ext)
# Now call the filter to see if this transaction should be
# added to the return list...
if filter is None or filter(txndesc):
......@@ -1359,6 +1365,20 @@ class BDBFullStorage(BerkeleyBase, ConflictResolvingStorage):
# First, the public API for classic pack
def pack(self, t, zreferencesf):
"""Perform a pack on the storage.
There are two forms of packing: incremental and full gc. In an
incremental pack, only old object revisions are removed. In a full gc
pack, cyclic garbage detection and removal is also performed.
t is the pack time. All non-current object revisions older than t
will be removed in an incremental pack.
pack() always performs an incremental pack. If the gc flag is True,
then pack() will also perform a garbage collection. Some storages
(e.g. FileStorage) always do both phases in a pack() call. Such
storages should simply ignore the gc flag.
"""
# For all intents and purposes, referencesf here is always going to be
# the same as ZODB.referencesf.referencesf. It's too much of a PITA
# to pass that around to the helper methods, so just assert they're
......@@ -1381,14 +1401,36 @@ class BDBFullStorage(BerkeleyBase, ConflictResolvingStorage):
self.log('classic pack finished')
def _dopack(self, t, gc=True):
# BAW: should a pack time in the future be a ValueError? When ZEO is
# involved, t could come from a remote machine with a skewed clock.
# Jim wants us to believe t if it's "close", but our algorithm
# requires synchronicity between the calculation of the pack time and
# the timestamps used in serial numbers.
#
# If a transaction is currently in progress, wait for it to finish
# before calculating the pack time, by acquiring the commit lock.
# This guarantees that the next transaction begins after the pack
# time so that any objects added in that transaction will have a
# serial number greater than the pack time. Such objects will be
# completely ignored for packing purposes.
#
# If we don't do this, then it would be possible for some of the
# current transaction's objects to have been stored with a serial
# number earlier than the pack time, but not yet linked to the root.
# Say that thread 1 starts a transaction, and then thread 2 starts a
# pack. Thread 2 then marks the root-reachable objects, but before
# sweeping, object B is stored by thread 1. If the object linking B
# to the root hasn't been stored by the time of the sweep, B will be
# collected as garbage.
#
# t is a TimeTime, or time float, convert this to a TimeStamp object,
# using an algorithm similar to what's used in FileStorage. We know
# that our transaction ids, a.k.a. revision ids, are timestamps.
#
# BAW: should a pack time in the future be a ValueError? We'd have to
# worry about clock skew, so for now, we just set the pack time to the
# minimum of t and now.
self._commit_lock_acquire()
try:
packtime = min(t, time.time())
finally:
self._commit_lock_release()
t0 = TimeStamp(*(time.gmtime(packtime)[:5] + (packtime % 60,)))
packtid = `t0`
# Collect all revisions of all objects earlier than the pack time.
......@@ -1473,14 +1515,12 @@ class BDBFullStorage(BerkeleyBase, ConflictResolvingStorage):
# with it again. Otherwise, we can remove the metadata record
# for this revision and decref the corresponding pickle.
if oldserial <> ZERO:
orevid = oid+oldserial
# It's possible this object revision has already been
# deleted, if the oid points to a decref'd away object
try:
metadata = self._metadata[oid+oldserial]
except KeyError:
pass
else:
self._metadata.delete(oid+oldserial, txn=txn)
if self._metadata.has_key(orevid):
metadata = self._metadata[orevid]
self._metadata.delete(orevid, txn=txn)
# Decref the pickle
self._decrefPickle(oid, metadata[16:24], txn)
try:
......@@ -1505,19 +1545,19 @@ class BDBFullStorage(BerkeleyBase, ConflictResolvingStorage):
if lrevid == DNE:
# There is no pickle data
return
key = oid + lrevid
refcount = U64(self._pickleRefcounts.get(key, ZERO)) - 1
revid = oid + lrevid
refcount = U64(self._pickleRefcounts.get(revid, ZERO)) - 1
assert refcount >= 0
if refcount == 0:
# We can collect this pickle
self._pickleRefcounts.delete(key, txn=txn)
data = self._pickles[key]
self._pickles.delete(key, txn=txn)
self._pickleRefcounts.delete(revid, txn=txn)
data = self._pickles[revid]
self._pickles.delete(revid, txn=txn)
deltas = {}
self._update(deltas, data, -1)
self._decref(deltas, txn)
else:
self._pickleRefcounts.put(key, p64(refcount), txn=txn)
self._pickleRefcounts.put(revid, p64(refcount), txn=txn)
def _decref(self, deltas, txn):
for oid, delta in deltas.items():
......@@ -1605,7 +1645,7 @@ class BDBFullStorage(BerkeleyBase, ConflictResolvingStorage):
# BAW: Maybe this could probably be more efficient by not doing so
# much searching, but it would also be more complicated, so the
# tradeoff should be measured.
serial = None
serial, tid = self._getSerialAndTid(oid)
c = self._metadata.cursor(txn=txn)
try:
rec = c.set_range(oid)
......@@ -1623,9 +1663,52 @@ class BDBFullStorage(BerkeleyBase, ConflictResolvingStorage):
c.close()
return serial
def _rootset(self, packtid, txn):
# Find the root set for reachability purposes. A root set is a tuple
# of oid and tid. First, the current root object as of the pack time
# is always in the root set. Second, any object revision after the
# pack time that has a back pointer (lrevid) to before the pack time
# serves as another root because some future undo could then revive
# any referenced objects. The root set ends up in the oidqueue.
try:
zerorev = self._findrev(ZERO, packtid, txn)
except KeyError:
# There's no root object
return
self._oidqueue.append(ZERO+zerorev, txn)
c = self._txnoids.cursor(txn)
try:
try:
rec = c.set_range(packtid)
except db.DBNotFoundError:
rec = None
while rec:
tid, oid = rec
revid = oid + tid
rec = c.next()
lrevid = self._metadata[revid][16:24]
if lrevid < packtid:
self._oidqueue.append(revid, txn)
finally:
c.close()
# tid is None if all we care about is that any object revision is present.
def _packmark_has(self, oid, tid, txn):
if tid is None:
return self._packmark.has_key(oid)
c = self._packmark.cursor(txn)
try:
try:
c.set_both(oid, tid)
return True
except db.DBNotFoundError:
return False
finally:
c.close()
def _mark(self, txn, packtid):
# Find the oids for all the objects reachable from the root, as of the
# pack time. To reduce the amount of in-core memory we need do do a
# pack time. To reduce the amount of in-core memory we need to do a
# pack operation, we'll save the mark data in the packmark table. The
# oidqueue is a BerkeleyDB Queue that holds the list of object ids to
# look at next, and by using this we don't need to keep an in-memory
......@@ -1634,36 +1717,41 @@ class BDBFullStorage(BerkeleyBase, ConflictResolvingStorage):
# Quick exit for empty storages
if not self._serials:
return
# The oid of the object we're looking at, starting at the root
oid = ZERO
# Start at the root, find all the objects the current revision of the
# root references, and then for each of those, find all the objects it
# references, and so on until we've traversed the entire object graph.
while oid:
# Start with the root set, iterating over all reachable objects until
# we've traversed the entire object tree.
self._rootset(packtid, txn)
rec = self._oidqueue.consume(txn)
while rec:
if self._stop:
raise PackStop, 'stopped in _mark()'
if not self._packmark.has_key(oid):
# We haven't seen this object yet
self._packmark.put(oid, PRESENT, txn=txn)
# Get the pickle data for the most current revision of this
# object as of the pack time.
tid = self._findrev(oid, packtid, txn)
revid = rec[1]
oid = revid[:8]
tid = revid[8:]
# See if this revision is already in the packmark
if not self._packmark_has(oid, tid, txn):
# BAW: We are more conservative than FileStorage here, since
# any reference to an object keeps all the object references
# alive. FileStorage will collect individual object
# revisions. I think our way is fine since we'll eventually
# collect everything incrementally anyway, and for Berkeley,
# all object revisions add to the refcount total.
self._packmark.put(oid, tid, txn=txn)
# Say there's no root object (as is the case in some of the
# unit tests), and we're looking up oid ZERO. Then serial
# will be None.
if tid is not None:
lrevid = self._metadata[oid+tid][16:24]
data = self._pickles[oid+lrevid]
# Now get the oids of all the objects referenced by this
# pickle
# object revision
data = self._pickles[oid+lrevid]
refdoids = []
referencesf(data, refdoids)
# And append them to the queue for later
for oid in refdoids:
self._oidqueue.append(oid, txn)
for roid in refdoids:
rtid = self._findrev(roid, tid, txn)
self._oidqueue.append(roid+rtid, txn)
# Pop the next oid off the queue and do it all again
rec = self._oidqueue.consume(txn)
oid = rec and rec[1]
assert len(self._oidqueue) == 0
def _sweep(self, txn, packtid):
......@@ -1684,7 +1772,7 @@ class BDBFullStorage(BerkeleyBase, ConflictResolvingStorage):
# Otherwise, if packmark (which knows about all the root
# reachable objects) doesn't have a record for this guy, then
# we can zap it. Do so by appending to oidqueue.
if not self._packmark.has_key(oid):
if not self._packmark_has(oid, None, txn):
self._delqueue.append(oid, txn)
finally:
c.close()
......@@ -1722,7 +1810,7 @@ class BDBFullStorage(BerkeleyBase, ConflictResolvingStorage):
rec = c.next()
if rec is None:
raise IndexError
tid, data = rec
tid, txnmeta = rec
# Now unpack the necessary information. Don't impedence match the
# status flag (that's done by the caller).
packtime = self._last_packtime()
......@@ -1730,38 +1818,13 @@ class BDBFullStorage(BerkeleyBase, ConflictResolvingStorage):
packedp = True
else:
packedp = False
userlen, desclen = unpack('>II', data[:8])
user = data[8:8+userlen]
desc = data[8+userlen:8+userlen+desclen]
ext = data[8+userlen+desclen:]
user, desc, ext = self._unpack_txnmeta(txnmeta)
return tid, packedp, user, desc, ext
finally:
if c:
c.close()
self._lock_release()
def _alltxnoids(self, tid):
self._lock_acquire()
c = self._txnoids.cursor()
try:
oids = []
oidkeys = {}
try:
rec = c.set(tid)
except db.DBNotFoundError:
rec = None
while rec:
# Ignore the key
oid = rec[1]
if not oidkeys.has_key(oid):
oids.append(oid)
oidkeys[oid] = 1
rec = c.next_dup()
return oids
finally:
c.close()
self._lock_release()
# Other interface assertions
def supportsTransactionalUndo(self):
return True
......@@ -1793,6 +1856,7 @@ class _TransactionsIterator(_GetItemBase):
self._stop = stop
self._closed = False
self._first = True
self._iters = []
def __len__(self):
# This is a lie. It's here only for Python 2.1 support for
......@@ -1823,9 +1887,13 @@ class _TransactionsIterator(_GetItemBase):
if self._stop is not None and tid > self._stop:
raise IndexError
self._tid = tid
return _RecordsIterator(self._storage, tid, packedp, user, desc, ext)
it = _RecordsIterator(self._storage, tid, packedp, user, desc, ext)
self._iters.append(it)
return it
def close(self):
for it in self._iters:
it.close()
self._closed = True
......@@ -1863,14 +1931,11 @@ class _RecordsIterator(_GetItemBase):
self.status = ' '
self.user = user
self.description = desc
try:
self._extension = pickle.loads(ext)
except EOFError:
self._extension = {}
# Internal pointer
self._oids = self._storage._alltxnoids(self.tid)
# To make .pop() more efficient
self._oids.reverse()
self._extension = ext
# BAW: touching the storage's private parts!
self._table = self._storage._txnoids
self._cursor = None
self._rec = None
def next(self):
"""Return the ith item in the sequence of record data.
......@@ -1879,10 +1944,42 @@ class _RecordsIterator(_GetItemBase):
IndexError will be raised after all of the items have been
returned.
"""
# Let IndexError percolate up
oid = self._oids.pop()
if self._table is None:
# We already exhausted this iterator
raise IndexError
# Initialize a txnoids cursor and set it to the start of the oids
# touched by this transaction. We do this here to ensure the cursor
# is closed if there are any problems. A hole in this approach is if
# the client never exhausts the iterator. Then I think we have a
# problem because I don't think the environment can be closed if
# there's an open cursor, but you also cannot close the cursor if the
# environment is already closed (core dumps), so an __del__ doesn't
# help a whole lot.
try:
if self._cursor is None:
self._cursor = self._table.cursor()
try:
self._rec = self._cursor.set(self.tid)
except db.DBNotFoundError:
pass
# Cursor exhausted?
if self._rec is None:
self.close()
raise IndexError
oid = self._rec[1]
self._rec = self._cursor.next_dup()
data, version, lrevid = self._storage._loadSerialEx(oid, self.tid)
return _Record(oid, self.tid, version, data, lrevid)
except:
self.close()
raise
def close(self):
if self._cursor:
self._cursor.close()
self._cursor = None
# _table == None means the iterator has been exhausted
self._table = None
......@@ -1910,22 +2007,20 @@ class _Record:
class _Autopack(_WorkThread):
NAME = 'autopacking'
def __init__(self, storage, event,
frequency, packtime, classicpack,
lastpacktime):
def __init__(self, storage, event, frequency, packtime, gcpack):
_WorkThread.__init__(self, storage, event, frequency)
self._packtime = packtime
self._classicpack = classicpack
self._gcpack = gcpack
# Bookkeeping
self._lastclassic = 0
self._lastgc = 0
def _dowork(self):
# Should we do a classic pack this time?
if self._classicpack <= 0:
classicp = False
# Should we do a full gc pack this time?
if self._gcpack <= 0:
dofullgc = False
else:
v = (self._lastclassic + 1) % self._classicpack
self._lastclassic = v
classicp = not v
# Run the autopack phase
self._storage.autopack(time.time() - self._packtime, classicp)
v = (self._lastgc + 1) % self._gcpack
self._lastgc = v
dofullgc = not v
# Run the full gc phase
self._storage.autopack(time.time() - self._packtime, dofullgc)
......@@ -15,9 +15,7 @@
"""Berkeley storage without undo or versioning.
"""
__version__ = '$Revision: 1.30 $'[-2:][0]
from __future__ import nested_scopes
__version__ = '$Revision: 1.31 $'[-2:][0]
from ZODB import POSException
from ZODB.utils import p64, U64
......@@ -31,12 +29,6 @@ ABORT = 'A'
COMMIT = 'C'
PRESENT = 'X'
try:
True, False
except NameError:
True = 1
False = 0
BDBMINIMAL_SCHEMA_VERSION = 'BM01'
......@@ -292,6 +284,8 @@ class BDBMinimalStorage(BerkeleyBase, ConflictResolvingStorage):
return newserial
def store(self, oid, serial, data, version, transaction):
if self._is_read_only:
raise POSException.ReadOnlyError()
if transaction is not self._transaction:
raise POSException.StorageTransactionError(self, transaction)
# We don't support versions
......
......@@ -12,7 +12,7 @@
#
##############################################################################
"""Base class for BerkeleyStorage implementations.
"""Base class for BerkeleyDB-based storage implementations.
"""
import os
......@@ -28,7 +28,7 @@ from BDBStorage import db, ZERO
# BaseStorage provides primitives for lock acquisition and release, and a host
# of other methods, some of which are overridden here, some of which are not.
from ZODB.lock_file import LockFile
from ZODB.lock_file import lock_file
from ZODB.BaseStorage import BaseStorage
from ZODB.referencesf import referencesf
import ThreadLock
......@@ -56,15 +56,26 @@ class PackStop(Exception):
class BerkeleyConfig:
"""Bag of bits for describing various underlying configuration options.
"""Bag of attributes for configuring Berkeley based storages.
Berkeley databases are wildly configurable, and this class exposes some of
that. To customize these options, instantiate one of these classes and
set the attributes below to the desired value. Then pass this instance to
the Berkeley storage constructor, using the `config' keyword argument.
BerkeleyDB stores all its information in an `environment directory'
(modulo log files, which can be in a different directory, see below). By
default, the `name' argument given to the storage constructor names this
directory, but you can set this option to explicitly point to a different
location:
- envdir if not None, names the BerkeleyDB environment directory. The
directory will be created if necessary, but its parent directory must
exist. Additional configuration is available through the BerkeleyDB
DB_CONFIG mechanism.
Berkeley storages need to be checkpointed occasionally, otherwise
automatic recover can take a huge amount of time. You should set up a
automatic recovery can take a huge amount of time. You should set up a
checkpointing policy which trades off the amount of work done periodically
against the recovery time. Note that the Berkeley environment is
automatically, and forcefully, checkpointed twice when it is closed.
......@@ -80,7 +91,7 @@ class BerkeleyConfig:
- min is passed directly to txn_checkpoint()
You can acheive one of the biggest performance wins by moving the Berkeley
You can achieve one of the biggest performance wins by moving the Berkeley
log files to a different disk than the data files. We saw between 2.5 and
7 x better performance this way. Here are attributes which control the
log files.
......@@ -111,17 +122,18 @@ class BerkeleyConfig:
to autopack to. E.g. if packtime is 14400, autopack will pack to 4
hours in the past. For Minimal storage, this value is ignored.
- classicpack is an integer indicating how often an autopack phase should
do a full classic pack. E.g. if classicpack is 24 and frequence is
3600, a classic pack will be performed once per day. Set to zero to
never automatically do classic packs. For Minimal storage, this value
is ignored -- all packs are classic packs.
- gcpack is an integer indicating how often an autopack phase should do a
full garbage collecting pack. E.g. if gcpack is 24 and frequence is
3600, a gc pack will be performed once per day. Set to zero to never
automatically do gc packs. For Minimal storage, this value is ignored;
all packs are gc packs.
Here are some other miscellaneous configuration variables:
- read_only causes ReadOnlyError's to be raised whenever any operation
(except pack!) might modify the underlying database.
"""
envdir = None
interval = 120
kbyte = 0
min = 0
......@@ -129,13 +141,14 @@ class BerkeleyConfig:
cachesize = 128 * 1024 * 1024
frequency = 0
packtime = 4 * 60 * 60
classicpack = 0
read_only = 0
gcpack = 0
read_only = False
def __repr__(self):
d = self.__class__.__dict__.copy()
d.update(self.__dict__)
return """<BerkeleyConfig (read_only=%(read_only)s):
\tenvironment dir:: %(envdir)s
\tcheckpoint interval: %(interval)s seconds
\tcheckpoint kbytes: %(kbyte)s
\tcheckpoint minutes: %(min)s
......@@ -145,7 +158,7 @@ class BerkeleyConfig:
\t----------------------
\tautopack frequency: %(frequency)s seconds
\tpack to %(packtime)s seconds in the past
\tclassic pack every %(classicpack)s autopacks
\tclassic pack every %(gcpack)s autopacks
\t>""" % d
......@@ -260,12 +273,12 @@ class BerkeleyBase(BaseStorage):
# Our storage is based on the underlying BSDDB btree database type.
if reclen is not None:
d.set_re_len(reclen)
# DB 4.1 requires that operations happening in a transaction must be
# performed on a database that was opened in a transaction. Since we
# do the former, we must do the latter. However, earlier DB versions
# don't transactionally protect database open, so this is the most
# portable way to write the code.
openflags = db.DB_CREATE
# DB 4.1.24 requires that operations happening in a transaction must
# be performed on a database that was opened in a transaction. Since
# we do the former, we must do the latter. However, earlier DB
# versions don't transactionally protect database open, so this is the
# most portable way to write the code.
try:
openflags |= db.DB_AUTO_COMMIT
except AttributeError:
......@@ -312,10 +325,11 @@ class BerkeleyBase(BaseStorage):
If last is provided, the new oid will be one greater than that.
"""
# BAW: the last parameter is undocumented in the UML model
newoid = BaseStorage.new_oid(self, last)
if self._len is not None:
# Increment the cached length
self._len += 1
return BaseStorage.new_oid(self, last)
return newoid
def getSize(self):
"""Return the size of the database."""
......@@ -401,8 +415,10 @@ class BerkeleyBase(BaseStorage):
# can't hurt and is more robust.
self._env.txn_checkpoint(0, 0, db.DB_FORCE)
self._env.txn_checkpoint(0, 0, db.DB_FORCE)
lockfile = os.path.join(self._env.db_home, '.lock')
self._lockfile.close()
self._env.close()
os.unlink(lockfile)
# A couple of convenience methods
def _update(self, deltas, data, incdec):
......@@ -466,7 +482,15 @@ def env_from_string(envname, config):
# This is required in order to work around the Berkeley lock
# exhaustion problem (i.e. we do our own application level locks
# rather than rely on Berkeley's finite page locks).
lockfile = LockFile(os.path.join(envname, '.lock'))
lockpath = os.path.join(envname, '.lock')
try:
lockfile = open(lockpath, 'r+')
except IOError, e:
if e.errno <> errno.ENOENT: raise
lockfile = open(lockpath, 'w+')
lock_file(lockfile)
lockfile.write(str(os.getpid()))
lockfile.flush()
try:
# Create, initialize, and open the environment
env = db.DBEnv()
......
......@@ -12,19 +12,11 @@
#
##############################################################################
# Python 2.2 and earlier requires the pybsddb distutils package, but for
# Python 2.3, we really want to use the standard bsddb package. Also, we want
# to set a flag that other modules can easily tests to see if this stuff is
# available or not. Python 2.2 and 2.3 has bool() but not Python 2.1.
# Requirements:
#
# Get the pybsddb extension module from pybsddb.sourceforge.net and the
# BerkeleyDB libraries from www.sleepycat.com.
try:
bool
except NameError:
def bool(x):
return not not x
# All: BerkeleyDB 4.1.25, available from www.sleepycat.com
# Python 2.2: PyBSDDB 4.1.3 or better, from pybsddb.sf.net
# Python 2.3: nothing extra
try:
from bsddb import db
......@@ -34,6 +26,7 @@ except ImportError:
except ImportError:
db = None
# This flag tells other components whether Berkeley storage is available
is_available = bool(db)
# Useful constants
......
......@@ -56,7 +56,7 @@ class TestAutopackBase(BerkeleyTestBase):
# Autopack every 1 second, 2 seconds into the past, no classic packs
config.frequency = 1
config.packtime = 2
config.classicpack = 0
config.gcpack = 0
return config
def _wait_for_next_autopack(self):
......@@ -119,7 +119,7 @@ class TestAutomaticClassicPack(TestAutopackBase):
# every time.
config.frequency = 1
config.packtime = 2
config.classicpack = 1
config.gcpack = 1
return config
def testAutomaticClassicPack(self):
......@@ -270,278 +270,16 @@ class TestMinimalPack(TestAutopackBase):
raises(KeyError, storage.load, oid2, '')
class RaceConditionBase(BerkeleyTestBase):
def setUp(self):
BerkeleyTestBase.setUp(self)
self._cv = threading.Condition()
self._storage.cv = self._cv
def tearDown(self):
# clean up any outstanding transactions
get_transaction().abort()
BerkeleyTestBase.tearDown(self)
def _getPackThread(self, storage):
raise NotImplementedError
def testRaceCondition(self):
unless = self.failUnless
storage = self._storage
db = DB(storage)
conn = db.open()
root = conn.root()
# Start by storing a root reachable object.
obj1 = C()
obj1.value = 888
root.obj1 = obj1
txn = get_transaction()
txn.note('root -> obj1')
txn.commit()
# Now, start a transaction, store an object, but don't yet complete
# the transaction. This will ensure that the second object has a tid
# < packtime, but it won't be root reachable yet.
obj2 = C()
t = Transaction()
storage.tpc_begin(t)
obj2sn = storage.store('\0'*7 + '\2', ZERO, zodb_pickle(obj2), '', t)
# Now, acquire the condvar lock and start a thread that will do a
# pack, up to the _sweep call. Wait for the _mark() call to
# complete.
now = time.time()
while now == time.time():
time.sleep(0.1)
self._cv.acquire()
packthread = self._getPackThread(storage)
packthread.start()
self._cv.wait()
# Now that the _mark() has finished, complete the transaction, which
# links the object to root.
root.obj2 = obj2
rootsn = storage.getSerial(ZERO)
rootsn = storage.store(ZERO, rootsn, zodb_pickle(root), '', t)
storage.tpc_vote(t)
storage.tpc_finish(t)
# And notify the pack thread that it can do the sweep and collect
self._cv.notify()
self._cv.wait()
# We're done with the condvar and the thread
self._cv.release()
packthread.join()
# Now make sure that all the interesting objects are still available
rootsn = storage.getSerial(ZERO)
obj1sn = storage.getSerial('\0'*7 + '\1')
obj2sn = storage.getSerial('\0'*7 + '\2')
# obj1 revision was written before the second revision of the root
unless(obj1sn < rootsn)
unless(rootsn == obj2sn)
unless(obj1sn < obj2sn)
def testEarlierRaceCondition(self):
unless = self.failUnless
storage = self._storage
db = DB(storage)
conn = db.open()
root = conn.root()
# Start by storing a root reachable object.
obj1 = C()
obj1.value = 888
root.obj1 = obj1
txn = get_transaction()
txn.note('root -> obj1')
txn.commit()
# Now, start a transaction, store an object, but don't yet complete
# the transaction. This will ensure that the second object has a tid
# < packtime, but it won't be root reachable yet.
obj2 = C()
t = Transaction()
storage.tpc_begin(t)
# Now, acquire the condvar lock and start a thread that will do a
# pack, up to the _sweep call. Wait for the _mark() call to
# complete.
now = time.time()
while now == time.time():
time.sleep(0.1)
self._cv.acquire()
packthread = self._getPackThread(storage)
packthread.start()
self._cv.wait()
obj2sn = storage.store('\0'*7 + '\2', ZERO, zodb_pickle(obj2), '', t)
# Now that the _mark() has finished, complete the transaction, which
# links the object to root.
root.obj2 = obj2
rootsn = storage.getSerial(ZERO)
rootsn = storage.store(ZERO, rootsn, zodb_pickle(root), '', t)
storage.tpc_vote(t)
storage.tpc_finish(t)
# And notify the pack thread that it can do the sweep and collect
self._cv.notify()
self._cv.wait()
# We're done with the condvar and the thread
self._cv.release()
packthread.join()
# Now make sure that all the interesting objects are still available
rootsn = storage.getSerial(ZERO)
obj1sn = storage.getSerial('\0'*7 + '\1')
obj2sn = storage.getSerial('\0'*7 + '\2')
# obj1 revision was written before the second revision of the root
unless(obj1sn < rootsn)
unless(rootsn == obj2sn)
unless(obj1sn < obj2sn)
# Subclass which does ugly things to _dopack so we can actually test the race
# condition. We need to store a new object in the database between the
# _mark() call and the _sweep() call.
class SynchronizedFullStorage(BDBFullStorage):
# XXX Cut and paste copy from BDBFullStorage, except where indicated
def _dopack(self, t, gc=True):
# t is a TimeTime, or time float, convert this to a TimeStamp object,
# using an algorithm similar to what's used in FileStorage. We know
# that our transaction ids, a.k.a. revision ids, are timestamps.
#
# BAW: should a pack time in the future be a ValueError? We'd have to
# worry about clock skew, so for now, we just set the pack time to the
# minimum of t and now.
packtime = min(t, time.time())
t0 = TimeStamp(*(time.gmtime(packtime)[:5] + (packtime % 60,)))
packtid = `t0`
# Collect all revisions of all objects earlier than the pack time.
self._lock_acquire()
try:
self._withtxn(self._collect_revs, packtid)
finally:
self._lock_release()
# Collect any objects with refcount zero.
self._lock_acquire()
try:
self._withtxn(self._collect_objs)
finally:
self._lock_release()
# If we're not doing a classic pack, we're done.
if not gc:
return
# Do a mark and sweep for garbage collection. Calculate the set of
# objects reachable from the root. Anything else is a candidate for
# having all their revisions packed away. The set of reachable
# objects lives in the _packmark table.
self._lock_acquire()
try:
self._withtxn(self._mark, packtid)
finally:
self._lock_release()
# XXX thread coordination code start
self.cv.acquire()
self.cv.notify()
self.cv.wait()
# XXX thread coordination code stop
#
# Now perform a sweep, using oidqueue to hold all object ids for
# objects which are not root reachable as of the pack time.
self._lock_acquire()
try:
self._withtxn(self._sweep, packtid)
finally:
self._lock_release()
# Once again, collect any objects with refcount zero due to the mark
# and sweep garbage collection pass.
self._lock_acquire()
try:
self._withtxn(self._collect_objs)
finally:
self._lock_release()
# XXX thread coordination code start
self.cv.notify()
self.cv.release()
# XXX thread coordination code stop
class FullPackThread(threading.Thread):
def __init__(self, storage):
threading.Thread.__init__(self)
self._storage = storage
def run(self):
self._storage.autopack(time.time(), gc=True)
class TestFullClassicPackRaceCondition(RaceConditionBase):
ConcreteStorage = SynchronizedFullStorage
def _getPackThread(self, storage):
return FullPackThread(storage)
# Subclass which does ugly things to _dopack so we can actually test the race
# condition. We need to storage a new object in the database between the
# _mark() call and the _sweep() call.
class SynchronizedMinimalStorage(BDBMinimalStorage):
# XXX Cut and paste copy from BDBMinimalStorage, except where indicated
def _dopack(self):
# Do a mark and sweep for garbage collection. Calculate the set of
# objects reachable from the root. Anything else is a candidate for
# having all their revisions packed away. The set of reachable
# objects lives in the _packmark table.
self._lock_acquire()
try:
self._withtxn(self._mark)
finally:
self._lock_release()
# XXX thread coordination code start
self.cv.acquire()
self.cv.notify()
self.cv.wait()
# XXX thread coordination code stop
#
# Now perform a sweep, using oidqueue to hold all object ids for
# objects which are not root reachable as of the pack time.
self._lock_acquire()
try:
self._withtxn(self._sweep)
finally:
self._lock_release()
# Once again, collect any objects with refcount zero due to the mark
# and sweep garbage collection pass.
self._lock_acquire()
try:
self._withtxn(self._collect_objs)
finally:
self._lock_release()
# XXX thread coordination code start
self.cv.notify()
self.cv.release()
# XXX thread coordination code stop
class MinimalPackThread(threading.Thread):
def __init__(self, storage):
threading.Thread.__init__(self)
self._storage = storage
def run(self):
self._storage.pack(time.time(), referencesf)
class TestMinimalClassicPackRaceCondition(RaceConditionBase):
ConcreteStorage = SynchronizedMinimalStorage
def _getPackThread(self, storage):
return MinimalPackThread(storage)
def test_suite():
return BDBStorage.tests.BerkeleyTestBase.makeSuite(
TestAutopack,
TestAutomaticClassicPack,
TestMinimalPack,
TestFullClassicPackRaceCondition,
TestMinimalClassicPackRaceCondition,
prefix='test',
level=2
)
suite = unittest.TestSuite()
suite.level = 2
if BDBStorage.is_available:
suite.addTest(unittest.makeSuite(TestAutopack))
suite.addTest(unittest.makeSuite(TestAutomaticClassicPack))
suite.addTest(unittest.makeSuite(TestMinimalPack))
return suite
if __name__ == '__main__':
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment