Commit b6ff5a09 authored by Barry Warsaw's avatar Barry Warsaw

Merging the Berkeley storage's bdb-nolocks branch back into the trunk

for ZODB 3.2.
parent 24482ef0
...@@ -14,14 +14,14 @@ ...@@ -14,14 +14,14 @@
"""An autopacking Berkeley storage without undo and versioning. """An autopacking Berkeley storage without undo and versioning.
""" """
__version__ = '$Revision: 1.4 $'.split()[-2:][0] __version__ = '$Revision: 1.5 $'.split()[-2:][0]
import sys import sys
import os import os
import struct import struct
import time import time
# This uses the Dunn/Kuchling PyBSDDB v3 extension module available from # This uses the Dunn/Kuchling PyBSDDB3 extension module available from
# http://pybsddb.sourceforge.net # http://pybsddb.sourceforge.net
from bsddb3 import db from bsddb3 import db
...@@ -61,7 +61,7 @@ class Autopack(BerkeleyBase): ...@@ -61,7 +61,7 @@ class Autopack(BerkeleyBase):
# base class infrastructure and are shared by the Minimal # base class infrastructure and are shared by the Minimal
# implementation. # implementation.
# #
# serials -- {oid -> serial} # serials -- {oid+tid -> serial}
# Maps oids to object serial numbers. The serial number is # Maps oids to object serial numbers. The serial number is
# essentially a timestamp used to determine if conflicts have # essentially a timestamp used to determine if conflicts have
# arisen, and serial numbers double as transaction ids and object # arisen, and serial numbers double as transaction ids and object
...@@ -104,6 +104,32 @@ class Autopack(BerkeleyBase): ...@@ -104,6 +104,32 @@ class Autopack(BerkeleyBase):
self._oids.close() self._oids.close()
BerkeleyBase.close(self) BerkeleyBase.close(self)
def _getSerial(self, oid):
c = self._serials.cursor()
try:
lastvalue = None
# Search for the largest oid+revid key in the serials table that
# doesn't have a revid component equal to the current revid.
try:
rec = c.set_range(oid)
except db.DBNotFoundError:
rec = None
while rec:
key, value = rec
koid = key[:8]
ktid = key[8:]
if koid <> oid:
break
lastvalue = value
if ktid == self._serial:
break
rec = c.next()
if lastvalue is None:
return None
return lastvalue[:8]
finally:
c.close()
def _begin(self, tid, u, d, e): def _begin(self, tid, u, d, e):
# Nothing needs to be done # Nothing needs to be done
pass pass
...@@ -112,12 +138,41 @@ class Autopack(BerkeleyBase): ...@@ -112,12 +138,41 @@ class Autopack(BerkeleyBase):
# Nothing needs to be done, but override the base class's method # Nothing needs to be done, but override the base class's method
pass pass
def store(self, oid, serial, data, version, transaction):
self._lock_acquire()
try:
# Transaction guard
if transaction is not self._transaction:
raise POSException.StorageTransactionError(self, transaction)
# We don't support versions
if version <> '':
raise POSException.Unsupported, 'versions are not supported'
oserial = self._getSerial(oid)
if oserial is not None and serial <> oserial:
# BAW: Here's where we'd try to do conflict resolution
raise POSException.ConflictError(serials=(oserial, serial))
tid = self._serial
txn = self._env.txn_begin()
try:
self._serials.put(oid+tid, self._serial, txn=txn)
self._pickles.put(oid+tid, data, txn=txn)
self._actions.put(tid+oid, INC, txn=txn)
self._oids.put(oid, ' ', txn=txn)
except:
txn.abort()
raise
else:
txn.commit()
return self._serial
finally:
self._lock_release()
def _finish(self, tid, u, d, e): def _finish(self, tid, u, d, e):
# TBD: what about u, d, and e? # TBD: what about u, d, and e?
# #
# First, append a DEL to the actions for each old object, then update # First, append a DEL to the actions for each old object, then update
# the current serials table so that its revision id points to this # the current serials table so that its revision id points to this
# trancation id. # transaction id.
txn = self._env.txn_begin() txn = self._env.txn_begin()
try: try:
c = self._oids.cursor() c = self._oids.cursor()
...@@ -128,8 +183,8 @@ class Autopack(BerkeleyBase): ...@@ -128,8 +183,8 @@ class Autopack(BerkeleyBase):
lastrevid = self._serials.get(oid, txn=txn) lastrevid = self._serials.get(oid, txn=txn)
if lastrevid: if lastrevid:
self._actions.put(lastrevid+oid, DEC, txn=txn) self._actions.put(lastrevid+oid, DEC, txn=txn)
self._serials.put(oid, tid, txn=txn)
rec = c.next() rec = c.next()
self._oids.truncate()
finally: finally:
c.close() c.close()
except: except:
...@@ -137,7 +192,6 @@ class Autopack(BerkeleyBase): ...@@ -137,7 +192,6 @@ class Autopack(BerkeleyBase):
raise raise
else: else:
txn.commit() txn.commit()
self._oids.truncate()
# Override BerkeleyBase._abort() # Override BerkeleyBase._abort()
def _abort(self): def _abort(self):
...@@ -164,30 +218,6 @@ class Autopack(BerkeleyBase): ...@@ -164,30 +218,6 @@ class Autopack(BerkeleyBase):
self._oids.truncate() self._oids.truncate()
self._transaction.abort() self._transaction.abort()
def store(self, oid, serial, data, version, transaction):
# Transaction guard
if transaction is not self._transaction:
raise POSException.StorageTransactionError(self, transaction)
# We don't support versions
if version <> '':
raise POSException.Unsupported, 'versions are not supported'
oserial = self._serials.get(oid)
if oserial is not None and serial <> oserial:
# BAW: Here's where we'd try to do conflict resolution
raise POSException.ConflictError(serials=(oserial, serial))
tid = self._serial
txn = self._env.txn_begin()
try:
self._pickles.put(oid+tid, data, txn=txn)
self._actions.put(tid+oid, INC, txn=txn)
self._oids.put(oid, ' ', txn=txn)
except:
txn.abort()
raise
else:
txn.commit()
return self._serial
def load(self, oid, version): def load(self, oid, version):
if version <> '': if version <> '':
raise POSException.Unsupported, 'versions are not supported' raise POSException.Unsupported, 'versions are not supported'
...@@ -196,6 +226,7 @@ class Autopack(BerkeleyBase): ...@@ -196,6 +226,7 @@ class Autopack(BerkeleyBase):
def loadSerial(self, oid, serial): def loadSerial(self, oid, serial):
current = self._serials[oid] current = self._serials[oid]
# BAW: should we allow older serials to be retrieved?
if current == serial: if current == serial:
return self._pickles[oid+current] return self._pickles[oid+current]
else: else:
......
This source diff could not be displayed because it is too large. You can view the blob instead.
...@@ -13,236 +13,316 @@ ...@@ -13,236 +13,316 @@
############################################################################## ##############################################################################
"""Berkeley storage without undo or versioning. """Berkeley storage without undo or versioning.
See Full.py for an implementation of Berkeley storage that does support undo
and versioning.
""" """
__version__ = '$Revision: 1.12 $'[-2:][0] __version__ = '$Revision: 1.13 $'[-2:][0]
# This uses the Dunn/Kuchling PyBSDDB v3 extension module available from # This uses the Dunn/Kuchling PyBSDDB v3 extension module available from
# http://pybsddb.sourceforge.net. It is compatible with release 3.0 of # http://pybsddb.sourceforge.net. It is compatible with release 3.4 of
# PyBSDDB3. # PyBSDDB3.
from bsddb3 import db from bsddb3 import db
# BerkeleyBase.BerkeleyBase class provides some common functionality for both # BerkeleyBase class provides some common functionality for BerkeleyDB-based
# the Full and Minimal implementations. It in turn inherits from # storages. It in turn inherits from BaseStorage which itself provides some
# ZODB.BaseStorage.BaseStorage which itself provides some common storage # common storage functionality.
# functionality.
from BerkeleyBase import BerkeleyBase from BerkeleyBase import BerkeleyBase
from CommitLog import PacklessLog
from ZODB import POSException from ZODB import POSException
from ZODB import utils from ZODB.utils import U64, p64
from ZODB.referencesf import referencesf
ABORT = 'A'
COMMIT = 'C'
PRESENT = 'X'
ZERO = '\0'*8
class Minimal(BerkeleyBase): class Minimal(BerkeleyBase):
#
# Overrides of base class methods
#
def _setupDBs(self): def _setupDBs(self):
# Create the tables used to maintain the relevant information. The # Data Type Assumptions:
# minimal storage needs two tables:
# #
# serials -- maps object ids (oids) to object serial numbers. The # - Object ids (oid) are 8-bytes
# serial number is essentially a timestamp used to # - Objects have revisions, with each revision being identified by a
# determine if conflicts have arisen. If an attempt is # unique serial number.
# made to store an object with a serial number that is # - Transaction ids (tid) are 8-bytes
# different than the serial number we already have for # - Data pickles are of arbitrary length
# the object, a ConflictError is raised.
#
# pickles -- maps oids to the object's data pickles.
#
self._serials = self._setupDB('serials')
self._pickles = self._setupDB('pickles')
def _begin(self, tid, u, d, e):
# Begin the current transaction. Currently this just makes sure that
# the commit log is in the proper state.
if self._commitlog is None:
# JF: Chris was getting some weird errors / bizarre behavior from
# Berkeley when using an existing directory or having non-BSDDB
# files in that directory.
self._commitlog = PacklessLog(dir=self._env.db_home)
self._commitlog.start()
def _finish(self, tid, u, d, e):
# This is called from the storage interface's tpc_finish() method.
# Its responsibilities are to finish the transaction with the
# underlying database.
# #
# We have a problem here because tpc_finish() is not supposed to raise # The Minimal storage uses the following tables:
# any exceptions. However because finishing with the backend database
# /can/ cause exceptions, they may be thrown from here as well. If
# that happens, we abort the transaction.
# #
# Because of the locking semantics issue described above, finishing # serials -- {oid -> [serial]}
# the transaction in this case involves: # Maps oids to serial numbers. Each oid can be mapped to 1 or 2
# - starting a transaction with Berkeley DB # serial numbers (this is for optimistic writes). If it maps to
# - replaying our commit log for object updates # two serial numbers, then the current one is determined by the
# - storing those updates in BSDDB # pending flag (see below).
# - committing those changes to BSDDB
# #
# Once the changes are committed successfully to BSDDB, we're done # pickles -- {oid+serial -> pickle}
# with our log file. # Maps the object revisions to the revision's pickle data.
# #
# tid is the transaction id # refcounts -- {oid -> count}
# Maps the oid to the reference count for the object. This
# reference count is updated during the _finish() call. When it
# goes to zero, the object is automatically deleted.
# #
# u is the user associated with the transaction, used for # oids -- [oid]
# auditing, etc. # This is a list of oids of objects that are modified in the
# current uncommitted transaction.
# #
# d is the description of the transaction, arbitrary string, # pending -- tid -> 'A' | 'C'
# but might contain path information # This is an optional flag which says what to do when the database
# is recovering from a crash. The flag is normally 'A' which
# means any pending data should be aborted. At the start of the
# tpc_finish() this flag will be changed to 'C' which means, upon
# recovery/restart, all pending data should be committed. Outside
# of any transaction (e.g. before the tpc_begin()), there will be
# no pending entry. It is a database invariant that if the
# pending table is empty, the oids table must also be empty.
# #
# e is the transaction extension, extra metadata about the self._serials = self._setupDB('serials', db.DB_DUP)
# transaction, such quotas or other custom storage self._pickles = self._setupDB('pickles')
# policies. self._refcounts = self._setupDB('refcounts')
txn = self._env.txn_begin() self._oids = self._setupDB('oids')
self._pending = self._setupDB('pending')
# Do recovery and consistency checks
pendings = self._pending.keys()
assert len(pendings) <= 1
if len(pendings) == 0:
assert len(self._oids) == 0
return
# Do recovery
tid = pendings[0]
flag = self._pending.get(tid)
assert flag in (ABORT, COMMIT)
self._lock_acquire()
try: try:
# BAW: all objects have the same serial number? JF: all the if flag == ABORT:
# existing storages re-use the transaction's serial number for all self._do(self._doabort, tid)
# the objects, but they don't have to. In Jeremy's SimpleStorage,
# it's just a counter. _serial is set in BaseStorage.py during
# tpc_begin().
serial = self._serial
while 1:
rec = self._commitlog.next()
if rec is None:
break
oid, pickle = rec
# Put the object's serial number
self._serials.put(oid, serial, txn)
# Put the object's pickle data
self._pickles.put(oid, pickle, txn)
except:
# If any errors whatsoever occurred, abort the transaction with
# Berkeley, leave the commit log file in the PROMISED state (since
# its changes were never committed), and re-raise the exception.
txn.abort()
raise
else: else:
# Everything is hunky-dory. Commit the Berkeley transaction, and self._do(self._docommit, tid)
# reset the commit log for the next transaction. finally:
txn.commit() self._lock_release()
self._closelog()
def _abort(self):
# Throw away the current transaction. Since we don't have a
# transaction open to Berkeley, what this really means is that we're
# done with our commit log, so we should reset it.
self._closelog()
def close(self): def close(self):
# BAW: the original implementation also deleted these attributes. Was
# that just to reclaim the garbage?
self._serials.close() self._serials.close()
self._pickles.close() self._pickles.close()
# Base class implements some useful close behavior self._refcounts.close()
self._oids.close()
self._pending.close()
BerkeleyBase.close(self) BerkeleyBase.close(self)
# def _do(self, meth, tid):
# Public storage interface txn = self._env.txn_begin()
# try:
meth(tid, txn)
def load(self, oid, version): self._oids.truncate(txn)
"""Return the object pickle and serial number for the object self._pending.truncate(txn)
referenced by object id `oid'. The object is loaded from the back-end except:
storage. txn.abort()
self._docheckpoint()
raise
else:
txn.commit()
self._docheckpoint()
`version' is required by the storage interface, but it is ignored def _doabort(self, tid, txn):
because undo and versions are not supported. co = cs = None
"""
self._lock_acquire()
try: try:
serial = self._serials[oid] co = self._oids.cursor(txn=txn)
pickle = self._pickles[oid] cs = self._serials.cursor(txn=txn)
return pickle, serial rec = co.first()
while rec:
oid = rec[0]
rec = co.next()
try:
cs.set_both(oid, tid)
except db.DBNotFoundError:
pass
else:
cs.delete()
# And delete the pickle table entry for this revision.
self._pickles.delete(oid+tid, txn=txn)
finally: finally:
self._lock_release() # There's a small window of opportunity for leaking a cursor here,
# if co.close() were to fail. In practice this shouldn't happen.
def store(self, oid, serial, data, version, transaction): if co: co.close()
"""Store the object referenced by `oid'. if cs: cs.close()
The object is stored to the transaction named by `transaction', in def _docommit(self, tid, txn):
preparation for the commit or abort of the transaction (i.e. it is not deltas = {}
stored to the underlying database yet). co = cs = None
try:
co = self._oids.cursor(txn=txn)
cs = self._serials.cursor(txn=txn)
rec = co.first()
while rec:
oid = rec[0]
rec = co.next()
# Remove from the serials table all entries with key oid where
# the serial is not tid. These are the old revisions of the
# object. At the same time, we want to collect the oids of
# the objects referred to by this revision's pickle, so that
# later we can decref those reference counts.
srec = cs.set(oid)
while srec:
soid, stid = srec
if soid <> oid:
break
if stid <> tid:
cs.delete()
data = self._pickles.get(oid+stid, txn=txn)
assert data is not None
self._update(deltas, data, -1)
self._pickles.delete(oid+stid, txn=txn)
srec = cs.next_dup()
# Now add incref deltas for all objects referenced by the new
# revision of this object.
data = self._pickles.get(oid+tid, txn=txn)
assert data is not None
self._update(deltas, data, 1)
finally:
# There's a small window of opportunity for leaking a cursor here,
# if co.close() were to fail. In practice this shouldn't happen.
if co: co.close()
if cs: cs.close()
# Now, to finish up, we need apply the refcount deltas to the
# refcounts table, and do recursive collection of all refcount == 0
# objects.
while deltas:
deltas = self._update_refcounts(deltas, txn)
`serial' is the serial number of the object. If it does not match the def _update_refcounts(self, deltas, txn):
stored serial number, a ConflictError is raised. newdeltas = {}
for oid, delta in deltas.items():
rc = U64(self._refcounts.get(oid, ZERO, txn=txn)) + delta
assert rc >= 0
if rc == 0:
# The reference count for this object has just gone to zero,
# so we can safely remove all traces of it from the serials,
# pickles and refcounts table. Note that before we remove its
# pickle, we need to decref all the objects referenced by it.
current = self._getCurrentSerial(oid)
data = self._pickles.get(oid+current, txn=txn)
self._update(newdeltas, data, -1)
# And delete the serials, pickle and refcount entries. At
# this point, I believe we should have just one serial entry.
self._serials.delete(oid, txn=txn)
assert self._serials.get(oid, txn=txn) is None
self._refcounts.delete(oid, txn=txn)
self._pickles.delete(oid+current, txn=txn)
else:
self._refcounts.put(oid, p64(rc), txn=txn)
# Return the list of objects referenced by pickles just deleted in
# this round, for decref'ing on the next go 'round.
return newdeltas
`data' is object's data pickle. def _begin(self, tid, u, d, e):
# When a transaction begins, we set the pending flag to ABORT,
# meaning, if we crash between now and the time we vote, all changes
# will be aborted.
self._pending[self._serial] = ABORT
`version' is required by the storage interface, but it must be set to def store(self, oid, serial, data, version, transaction):
None because undo and versions are not supported.
"""
if transaction is not self._transaction: if transaction is not self._transaction:
raise POSException.StorageTransactionError(self, transaction) raise POSException.StorageTransactionError(self, transaction)
# We don't support versions
# Make sure the version is false. Usually it's an empty string, but if version <> '':
# we have to make sure. raise POSException.Unsupported, 'versions are not supported'
if version: # All updates must be done with the application lock acquired
raise POSException.Unsupported, "Versions aren't supported"
self._lock_acquire() self._lock_acquire()
try: try:
oserial = self._serials.get(oid) oserial = self._getCurrentSerial(oid)
if oserial is not None and serial <> oserial: if oserial is not None and serial <> oserial:
# The object exists in the database, but the serial number # The object exists in the database, but the serial number
# given in the call is not the same as the last stored serial # given in the call is not the same as the last stored serial
# number. Raise a ConflictError. # number. Raise a ConflictError.
raise POSException.ConflictError( #
serials=(oserial, serial)) # BAW: do application level conflict resolution
# Our serial number is updated in BaseStorage's tpc_begin() call, raise POSException.ConflictError(serials=(oserial, serial))
# which sets the serial number to the current timestamp. # Optimistically write to the serials and pickles table. Be sure
serial = self._serial # to also update the oids table for this object too.
# Write the object's pickle data to the commit log file newserial = self._serial
self._commitlog.write_object(oid, data) txn = self._env.txn_begin()
try:
self._serials.put(oid, newserial, txn=txn)
self._pickles.put(oid+newserial, data, txn=txn)
self._oids.put(oid, PRESENT, txn=txn)
except:
txn.abort()
self._docheckpoint()
raise
else:
txn.commit()
self._docheckpoint()
finally: finally:
self._lock_release() self._lock_release()
# Return our cached serial number for the object # Return the new serial number for the object
return serial return newserial
def pack(self, t, getrefsfunc): def _finish(self, tid, u, d, e):
"""Pack the storage. # Twiddle the pending flag to COMMIT now since after the vote call, we
# promise that the changes will be committed, no matter what. The
# recovery process will check this.
self._pending[self._serial] = COMMIT
self._do(self._docommit, self._serial)
Since this storage does not support versions, packing serves only to def _abort(self):
remove any objects that are not referenced from the root of the tree self._do(self._doabort, self._serial)
(i.e. they are garbage collected).
BAW: where are `t' and `getrefsfunc' defined in the model? And #
exactly what are their purpose and semantics? # Accessor interface
""" #
self._lock_acquire()
def _getCurrentSerial(self, oid):
# BAW: We must have the application level lock here.
c = self._serials.cursor()
try: try:
# Build an index only of those objects reachable from the root. # There can be zero, one, or two entries in the serials table for
# Unfortunately, we do this in memory, so the memory footprint of # this oid. If there are no entries, raise a KeyError (we know
# packing may still be substantial. # nothing about this object).
# #
# Known root objects are kept in this list and as new ones are # If there is exactly one entry then this has to be the entry for
# found, their oids are pushed onto the front of the list. It is # the object, regardless of the pending flag.
# also added to the seen dictionary, which keeps track of objects #
# we've seen already. When roots is empty, we're done visiting # If there are two entries, then we need to look at the pending
# all the objects. # flag to decide which to return (there /better/ be a pending flag
roots = ['\0\0\0\0\0\0\0\0'] # set!). If the pending flag is COMMIT then we've already voted
seen = {} # so the second one is the good one. If the pending flag is ABORT
while roots: # then we haven't yet committed to this transaction so the first
# Get the next oid from the roots list # one is the good one.
oid = roots.pop() serials = []
# Skip it if we've already seen it try:
if seen.has_key(oid): rec = c.set(oid)
continue except db.DBNotFoundError:
# Get the pickle corresponding to the object id and scan it rec = None
# for references to other objects. This is done by the while rec:
# magical `getrefsfunc' function given as an argument. serials.append(rec[1])
pickle = self._pickles[oid] rec = c.next_dup()
seen[oid] = 1 if not serials:
# This will prepend any new oids we'll need to scan return None
getrefsfunc(pickle, roots) if len(serials) == 1:
# Now, go through every oid for which we have a pickle, and if we return serials[0]
# have not seen it, then it must be garbage (because it was never pending = self._pending.get(self._serial)
# reached from one of the roots). In that case, delete its entry assert pending in (ABORT, COMMIT)
# in the pickle index. if pending == ABORT:
for oid in self._pickles.keys(): return serials[0]
if not seen.has_key(oid): return serials[1]
del self._pickles[oid] finally:
c.close()
def load(self, oid, version):
if version <> '':
raise POSException.Unsupported, 'versions are not supported'
self._lock_acquire()
try:
# Get the current serial number for this object
serial = self._getCurrentSerial(oid)
if serial is None:
raise KeyError, 'Object does not exist: %r' % oid
# Get this revision's pickle data
return self._pickles[oid+serial], serial
finally: finally:
self._lock_release() self._lock_release()
def modifiedInVersion(self, oid):
# So BaseStorage.getSerial() just works. Note that this storage
# doesn't support versions.
return ''
...@@ -23,84 +23,69 @@ from types import StringType ...@@ -23,84 +23,69 @@ from types import StringType
# http://pybsddb.sourceforge.net # http://pybsddb.sourceforge.net
from bsddb3 import db from bsddb3 import db
# BaseStorage provides some common storage functionality. It is derived from # BaseStorage provides primitives for lock acquisition and release, and a host
# UndoLogCompatible.UndoLogCompatible, which "[provides] backward # of other methods, some of which are overridden here, some of which are not.
# compatability with storages that have undoLog, but not undoInfo."
#
# BAW: I'm not entirely sure what that means, but the UndoLogCompatible
# subclass provides one method:
#
# undoInfo(first, last, specification). Unfortunately this method appears to
# be undocumented. Jeremy tells me it's still required though.
#
# BaseStorage provides primitives for lock acquisition and release,
# abortVersion(), commitVersion() and a host of other methods, some of which
# are overridden here, some of which are not.
from ZODB import POSException from ZODB import POSException
from ZODB.lock_file import lock_file
from ZODB.BaseStorage import BaseStorage from ZODB.BaseStorage import BaseStorage
from ZODB.referencesf import referencesf
__version__ = '$Revision: 1.19 $'.split()[-2:][0] GBYTES = 1024 * 1024 * 1000
# Lock usage is inherently unbounded because there may be an unlimited number __version__ = '$Revision: 1.20 $'.split()[-2:][0]
# of objects actually touched in any single transaction, and worst case could
# be that each object is on a different page in the database. Berkeley BTrees
# implement a lock per leaf page, plus a lock per level. We try to limit the
# negative effects of this by writing as much data optimistically as we can.
# But there's no way to completely avoid this. So this value is used to size
# the lock subsystem before the environment is opened.
DEFAULT_MAX_LOCKS = 20000
class BerkeleyConfig: class BerkeleyConfig:
"""Bag of bits for describing various underlying configuration options. """Bag of bits for describing various underlying configuration options.
Berkeley databases are wildly configurable, and this class exposes some of Berkeley databases are wildly configurable, and this class exposes some of
that. Two important configuration options are the size of the lock table that. To customize these options, instantiate one of these classes and
and the checkpointing policy. To customize these options, instantiate one set the attributes below to the desired value. Then pass this instance to
of these classes and set the attributes below to the desired value. Then the Berkeley storage constructor, using the `config' keyword argument.
pass this instance to the Berkeley storage constructor, using the `config'
keyword argument.
Locks in Berkeley are a limited and static resource; they can only be
changed before the environment is opened. It is possible for Berkeley
based storages to exhaust the available locks because worst case is to
consume one lock per object being modified, and transactions are unbounded
in the number of objects they modify. See
http://www.sleepycat.com/docs/ref/lock/max.html
for a discussion on lock sizing. These attributes control the lock
sizing:
- numlocks is passed directly to set_lk_max_locks() when the environment
is opened.
You will need to find the right balance between the number of locks Berkeley storages need to be checkpointed occasionally, otherwise
allocated and the system resources that consumes. If the locks are automatic recover can take a huge amount of time. You should set up a
exhausted a TransactionTooLargeError can get raised during commit. checkpointing policy which trades off the amount of work done periodically
against the recovery time. Note that the Berkeley environment is
To improve recovery times in case of failures, you should set up a automatically, and forcefully, checkpointed twice when it is closed.
checkpointing policy when you create the database. Note that the database
is automatically, and forcefully, checkpointed twice when it is closed.
But an exception during processing (e.g.
The following checkpointing attributes are supported: The following checkpointing attributes are supported:
- interval indicates the maximum number of calls to tpc_finish() after - interval indicates the approximate number of Berkeley transaction
which a checkpoint is performed. commits and aborts after which a checkpoint is performed. Berkeley
transactions are performed after ZODB aborts, commits, and stores.
- kbytes is passed directly to txn_checkpoint() - kbytes is passed directly to txn_checkpoint()
- min is passed directly to txn_checkpoint() - min is passed directly to txn_checkpoint()
You can acheive one of the biggest performance wins by moving the Berkeley
log files to a different disk than the data files. We saw between 2.5 and
7 x better performance this way. Here are attributes which control the
log files.
- logdir if not None, is passed to the environment's set_lg_dir() method - logdir if not None, is passed to the environment's set_lg_dir() method
before it is opened. before it is opened.
You can also improve performance by tweaking the Berkeley cache size.
Berkeley's default cache size is 256KB which is usually too small. Our
default cache size is 128MB which seems like a useful tradeoff between
resource consumption and improved performance. You might be able to get
slightly better results by turning up the cache size, although be mindful
of your system's limits. See here for more details:
http://www.sleepycat.com/docs/ref/am_conf/cachesize.html
These attributes control cache size settings:
- cachesize should be the size of the cache in bytes.
""" """
numlocks = DEFAULT_MAX_LOCKS
interval = 100 interval = 100
kbyte = 0 kbyte = 0
min = 0 min = 0
logdir = None logdir = None
cachesize = 128 * 1024 * 1024
...@@ -153,7 +138,7 @@ class BerkeleyBase(BaseStorage): ...@@ -153,7 +138,7 @@ class BerkeleyBase(BaseStorage):
if env == '': if env == '':
raise TypeError, 'environment name is empty' raise TypeError, 'environment name is empty'
elif isinstance(env, StringType): elif isinstance(env, StringType):
self._env = env_from_string(env, self._config) self._env, self._lockfile = env_from_string(env, self._config)
else: else:
self._env = env self._env = env
...@@ -161,22 +146,12 @@ class BerkeleyBase(BaseStorage): ...@@ -161,22 +146,12 @@ class BerkeleyBase(BaseStorage):
# Initialize a few other things # Initialize a few other things
self._prefix = prefix self._prefix = prefix
self._commitlog = None
# Give the subclasses a chance to interpose into the database setup # Give the subclasses a chance to interpose into the database setup
# procedure # procedure
self._setupDBs() self._setupDBs()
# Initialize the object id counter. # Initialize the object id counter.
self._init_oid() self._init_oid()
def _closelog(self):
if self._commitlog:
self._commitlog.finish()
# JF: unlinking might be too inefficient. JH: might use mmap
# files. BAW: maybe just truncate the file, or write a length
# into the headers and just zero out the length.
self._commitlog.close(unlink=1)
self._commitlog = None
def _setupDB(self, name, flags=0): def _setupDB(self, name, flags=0):
"""Open an individual database with the given flags. """Open an individual database with the given flags.
...@@ -229,7 +204,7 @@ class BerkeleyBase(BaseStorage): ...@@ -229,7 +204,7 @@ class BerkeleyBase(BaseStorage):
# BAW: the last parameter is undocumented in the UML model # BAW: the last parameter is undocumented in the UML model
if self._len is not None: if self._len is not None:
# Increment the cached length # Increment the cached length
self._len = self._len + 1 self._len += 1
return BaseStorage.new_oid(self, last) return BaseStorage.new_oid(self, last)
def getSize(self): def getSize(self):
...@@ -238,22 +213,8 @@ class BerkeleyBase(BaseStorage): ...@@ -238,22 +213,8 @@ class BerkeleyBase(BaseStorage):
filename = os.path.join(self._env.db_home, 'zodb_pickles') filename = os.path.join(self._env.db_home, 'zodb_pickles')
return os.path.getsize(filename) return os.path.getsize(filename)
# BAW: this overrides BaseStorage.tpc_vote() with exactly the same
# implementation. This is so Zope 2.3.1, which doesn't include the change
# to BaseStorage, will work with Berkeley. Once we can ignore older
# versions of ZODB, we can get rid of this.
def tpc_vote(self, transaction):
self._lock_acquire()
try:
if transaction is not self._transaction: return
self._vote()
finally:
self._lock_release()
def _vote(self): def _vote(self):
# Make a promise to commit all the registered changes. Rewind and put pass
# our commit log in the PROMISED state.
self._commitlog.promise()
def _finish(self, tid, user, desc, ext): def _finish(self, tid, user, desc, ext):
"""Called from BaseStorage.tpc_finish(), this commits the underlying """Called from BaseStorage.tpc_finish(), this commits the underlying
...@@ -272,7 +233,6 @@ class BerkeleyBase(BaseStorage): ...@@ -272,7 +233,6 @@ class BerkeleyBase(BaseStorage):
"""Called from BaseStorage.tpc_abort(), this aborts the underlying """Called from BaseStorage.tpc_abort(), this aborts the underlying
BSDDB transaction. BSDDB transaction.
""" """
self._closelog()
self._transaction.abort() self._transaction.abort()
def _clear_temp(self): def _clear_temp(self):
...@@ -297,22 +257,38 @@ class BerkeleyBase(BaseStorage): ...@@ -297,22 +257,38 @@ class BerkeleyBase(BaseStorage):
# can't hurt and is more robust. # can't hurt and is more robust.
self._env.txn_checkpoint(0, 0, db.DB_FORCE) self._env.txn_checkpoint(0, 0, db.DB_FORCE)
self._env.txn_checkpoint(0, 0, db.DB_FORCE) self._env.txn_checkpoint(0, 0, db.DB_FORCE)
lockfile = os.path.join(self._env.db_home, '.lock')
self._lockfile.close()
self._env.close() self._env.close()
self._closelog() os.unlink(lockfile)
# Useful for debugging
def _lockstats(self):
d = self._env.lock_stat()
return 'locks = [%(nlocks)d/%(maxnlocks)d]' % d
def _docheckpoint(self): def _docheckpoint(self):
# Periodically checkpoint the database. This is called approximately
# once per Berkeley transaction commit or abort.
config = self._config config = self._config
config._counter += 1 config._counter += 1
if config._counter > config.interval: if config._counter > config.interval:
self._env.txn_checkpoint(config.kbyte, config.min) self._env.txn_checkpoint(config.kbyte, config.min)
config._counter = 0 config._counter = 0
def _update(self, deltas, data, incdec):
refdoids = []
referencesf(data, refdoids)
for oid in refdoids:
rc = deltas.get(oid, 0) + incdec
if rc == 0:
# Save space in the dict by zapping zeroes
del deltas[oid]
else:
deltas[oid] = rc
def _withlock(self, meth, *args):
self._lock_acquire()
try:
return meth(*args)
finally:
self._lock_release()
def env_from_string(envname, config): def env_from_string(envname, config):
...@@ -323,16 +299,30 @@ def env_from_string(envname, config): ...@@ -323,16 +299,30 @@ def env_from_string(envname, config):
except OSError, e: except OSError, e:
if e.errno <> errno.EEXIST: raise if e.errno <> errno.EEXIST: raise
# already exists # already exists
# Create the lock file so no other process can open the environment.
# This is required in order to work around the Berkeley lock
# exhaustion problem (i.e. we do our own application level locks
# rather than rely on Berkeley's finite page locks).
lockpath = os.path.join(envname, '.lock')
try:
lockfile = open(lockpath, 'r+')
except IOError, e:
if e.errno <> errno.ENOENT: raise
lockfile = open(lockpath, 'w+')
lock_file(lockfile)
lockfile.write(str(os.getpid()))
lockfile.flush()
# Create, initialize, and open the environment
env = db.DBEnv() env = db.DBEnv()
env.set_lk_max_locks(config.numlocks)
if config.logdir is not None: if config.logdir is not None:
env.set_lg_dir(config.logdir) env.set_lg_dir(config.logdir)
gbytes, bytes = divmod(config.cachesize, GBYTES)
env.set_cachesize(gbytes, bytes)
env.open(envname, env.open(envname,
db.DB_CREATE # create underlying files as necessary db.DB_CREATE # create underlying files as necessary
| db.DB_RECOVER # run normal recovery before opening | db.DB_RECOVER # run normal recovery before opening
| db.DB_INIT_MPOOL # initialize shared memory buffer pool | db.DB_INIT_MPOOL # initialize shared memory buffer pool
| db.DB_INIT_LOCK # initialize locking subsystem
| db.DB_INIT_TXN # initialize transaction subsystem | db.DB_INIT_TXN # initialize transaction subsystem
| db.DB_THREAD # we use the environment from other threads | db.DB_THREAD # we use the environment from other threads
) )
return env return env, lockfile
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
# CommitLog class
#
# This class implements the action log for writes to non-committed
# transactions. They are replayed and applied all at once during _finish()
# which is called by BaseStorage's tpc_finish(). See FullImplementation.txt
# and notes for some discussion of the issues involved.
#
# BAW: understand this more, and figure out why we can't use BSDDB's
# lock_detect().
#
# File format:
#
# The log file consists of a standard header, followed by a number of marshal
# records. Each marshal consists of a single character opcode and an
# argument. The specific opcodes and arguments depend on the type of storage
# using the CommitLog instance, and derived classes provide a more specific
# interface for the storage.
__version__ = '$Revision: 1.12 $'.split()[-2:][0]
import sha
import struct
import os
import time
import marshal
import errno
from types import StringType
# JF: POSError is the ZODB version of Exception; it's fairly generic
# so a more specific exception might be better. E.g. StorageError
from ZODB.POSException import POSError
# Log file states.
#
# START is the transaction start state, and the log file must either not exist
# or be in the COMMITTED state in order to enter the START state.
START = 'S'
# OPEN state is where objects have begun to be stored into the log file during
# an open transaction. OPEN can only be entered from the START state, and
# upon the first object store, the state goes from START->OPEN. From here the
# transaction will either be committed or aborted.
OPEN = 'O'
# If the transaction is aborted, everything is discarded and the commit log is
# moved to the START state. This can only happen from the START or OPEN
# state. If the transaction is finished, then we are guaranteeing that the
# stored objects will be saved to the back-end storage. In that case we
# change the state to PROMISED and allow the storage to read the objects out
# again.
#
# Once the transaction commit has succeeded, the file's state is moved back to
# START for the next transaction.
PROMISED = 'P'
# Magic number for log file header
MAGIC = 0xfeedb00bL
# Version number
SCHEMA = 0x01
# The format of the file header. It consists of:
# - a 32 bit magic number
# - the 16-bit schema number
# - the single character commit log state flag
FMT = '>IHc'
FMTSZ = struct.calcsize(FMT)
class CommitLogError(POSError):
"""Base class for errors raised by the CommitLog class."""
class TruncationError(CommitLogError):
"""A log file truncation was detected on a header read."""
class LogCorruptedError(CommitLogError):
"""A read of a data record was incomplete or corrupted."""
class StateTransitionError(CommitLogError):
"""An illegal state transition was attempted."""
def __init__(self, state):
self.__state = state
def __str__(self):
return 'at invalid state: %c' % self.__state
class UncommittedChangesError(StateTransitionError):
"""The commit log has uncommitted changes.
This exception indicates that a promised commit of object states was not
either aborted or finished committing. No further object updates will be
allowed until the log file is replayed and explicitly cleared.
"""
class CommitLog:
def __init__(self, file=None, dir='.'):
"""Initialize the commit log, usually with a new file.
This is not a real temporary file because if we crash before the
transaction is committed, we'll need to replay this log. However, we
also need to be especially careful about the mode this file is written
in, otherwise evil processes could snoop information.
If `file' is provided it must be an already open file-like object that
supports seek() and appending r/w in binary mode.
Or `file' can be the name of a file which will be created with those
semantics. If `file' is omitted, we create a new such file, from a
(hopefully uniquely) crafted filename. In either of these two cases,
the filename is relative to dir (the default is the current
directory).
The commit file has a header consisting of the following information:
- a 32 bit magic number
- the 16-bit schema number
- the single character commit log state flag
Integers are standard size, big-endian.
"""
# BAW: is our filename unique enough? Are we opening it up with too
# much or too little security?
self._unlink = 1
self._fp = None
if file is None:
# Create the file from scratch. We know the file has to be in the
# init state, so just go ahead and write the appropriate header.
now = time.time()
pid = os.getpid()
file = sha.new(`now` + `pid`).hexdigest()
# BAW: what directory to create this in? /tmp doesn't seem right.
omask = os.umask(077) # -rw-------
try:
try:
os.makedirs(dir)
except OSError, e:
if e.errno <> errno.EEXIST: raise
self._fp = open(os.path.join(dir, file), 'w+b')
finally:
os.umask(omask)
self._writehead(START)
elif isinstance(file, StringType):
# Open the file in the proper mode. If it doesn't exist, write
# the start header.
omask = os.umask(077)
try:
try:
os.makedirs(dir)
except OSError, e:
if e.errno <> errno.EEXIST: raise
self._fp = open(os.path.join(dir, file), 'w+b')
finally:
os.umask(omask)
# Attempt to read any existing header. If we get an error, assume
# the file was created from scratch and write the start header.
try:
self._readhead()
except TruncationError:
self._writehead(START)
else:
# File object was created externally; maybe we're replaying an old
# log. Read the file's header and initialize our state from it.
self._fp = file
self._readhead()
self._unlink = 0
def get_filename(self):
return self._fp.name
def _writehead(self, state, pack=struct.pack):
# Scribble a new header onto the front of the file. state is the
# 1-character state flag.
assert len(state) == 1
self._state = state
data = pack(FMT, MAGIC, SCHEMA, state)
pos = self._fp.tell()
self._fp.seek(0)
self._fp.write(data)
self._fp.flush()
# Seek to the old file position, or just past the header, whichever is
# farther away.
self._fp.seek(max(self._fp.tell(), pos))
def _readhead(self, unpack=struct.unpack):
# Read the current file header, and return a tuple of the header data.
# If the file for some reason doesn't contain a complete header, a
# TruncationError is raised.
pos = self._fp.tell()
self._fp.seek(0)
header = self._fp.read(FMTSZ)
if len(header) <> FMTSZ:
raise TruncationError('short header read: %d bytes' % len(header))
try:
magic, schema, state = unpack(FMT, header)
except struct.error, e:
raise LogCorruptedError, e
if magic <> MAGIC:
raise LogCorruptedError, 'bad magic number: %x' % magic
#
# for now there's no backwards compatibility necessary
if schema <> SCHEMA:
raise LogCorruptedError, 'bad version number: %d' % schema
self._state = state
# See to the old file position, or just past the header, whichever is
# farther away.
self._fp.seek(max(self._fp.tell(), pos))
def _append(self, key, record, dump=marshal.dump):
# Store the next record in the file. Key is a single character
# marking the record type. Record must be a tuple of some record-type
# specific data. Record types and higher level write methods are
# defined in derived classes.
assert len(key) == 1
# Make assertions about the file's state
assert self._state in (START, OPEN, PROMISED)
if self._state == START:
self._writehead(OPEN)
elif self._state == OPEN:
pass
elif self._state == PROMISED:
raise UncommittedChangesError(
'Cannot write while promised updates remain uncommitted')
# We're good to go, append the object
self._fp.seek(0, 2) # to end of file
if self._fp.tell() < FMTSZ:
raise TruncationError, 'Last seek position < end of headers'
dump((key, record), self._fp)
self._fp.flush()
def start(self, load=marshal.load):
"""Move the file pointer to the start of the record data."""
self._readhead()
if self._state <> START:
raise StateTransitionError, self._state
self._fp.seek(FMTSZ)
def promise(self):
"""Move the transition to the PROMISED state, where we guarantee that
any changes to the object state will be committed to the backend
database before we ever write more updates.
"""
if self._state not in (START, OPEN):
raise StateTransitionError, self._state
self._writehead(PROMISED)
self._fp.seek(FMTSZ)
def finish(self):
"""We've finished committing all object updates to the backend
storage, or we're aborting the transation. In either case, we're done
with the data in our log file. Move the transition back to the start
state.
"""
# We need to truncate the file after writing the header, for the
# algorithm above to work.
self._writehead(START)
self._fp.truncate()
def _next(self, load=marshal.load):
# Read the next marshal record from the log. If there is no next
# record return None, otherwise return a 2-tuple of the record type
# character and the data tuple.
try:
return load(self._fp)
except EOFError:
return None
# BAW: let TypeError percolate up.
def next(self):
raise NotImplementedError
def close(self, unlink=1):
"""Close the file.
If unlink is true, delete the underlying file object too.
"""
if self._fp:
self._fp.close()
if (unlink or self._unlink) and os.path.exists(self._fp.name):
os.unlink(self._fp.name)
self._fp = None
def __del__(self):
# Unsafe, and file preserving close
self.close()
class PacklessLog(CommitLog):
# Higher level interface for reading and writing version-less/undo-less
# log records.
#
# Record types:
# 'o' - object state, consisting of an oid, and the object's pickle
# data
#
def write_object(self, oid, pickle):
self._append('o', (oid, pickle))
def next(self):
# Get the next object pickle data. Return the oid and the pickle
# string. Raise a LogCorruptedError if there's an incomplete marshal
# record.
rec = self._next()
if rec is None:
return None
try:
key, (oid, pickle) = rec
except ValueError:
raise LogCorruptedError, 'incomplete record'
if key <> 'o':
raise LogCorruptedError, 'bad record key: %s' % key
return oid, pickle
class FullLog(CommitLog):
# Higher level interface for reading and writing full versioning and
# undoable log records.
#
# Record types:
# 'o' - object state, consisting of an oid, vid, non-version revision
# id (nvrevid), live revision id (lrevid), the object's pickle,
# and a previous revision id (prevrevid). Note that there are
# actually higher level API method that write essentially the
# same record with some of the elements defaulted to the empty
# string or the "all-zeros" string.
# 'a' - Like 'o' but used in abortVersion transaction so that the
# object's serial number doesn't appear to change after the
# abortVersion.
# 'x' - Like 'o', but doesn't write a metadata record during _finish
# since the metadata record was written optimistically during
# the store() call.
# 'v' - new version record, consisting of a version string and a
# version id
# 'd' - discard version, consisting of a version id
#
def __init__(self, file=None, dir='.'):
"""Initialize the `full' commit log, usually with a new file."""
CommitLog.__init__(self, file, dir)
self.__versions = {}
self.__prevrevids = {}
def finish(self):
CommitLog.finish(self)
self.__versions.clear()
self.__prevrevids.clear()
def get_vid(self, version, missing=None):
"""Given a version string, return the associated vid.
If not present, return `missing'.
"""
return self.__versions.get(version, missing)
def get_prevrevid(self, oid, missing=None):
"""Given an object id, return the associated prevrevid.
If not present, return `missing'.
This method serves to allow transactionalUndo() to find undone
transactions that have been committed to the log, but not to the
database (i.e. multiple transactionalUndo()'s during a single
transaction).
"""
return self.__prevrevids.get(oid, missing)
# read/write protocol
def write_object(self, oid, vid, nvrevid, refdoids, prevrevid):
# Write an empty lrevid since that will be the same as the transaction
# id at the time of the commit to Berkeley.
#
# Since we're now writing the pickles directly to Berkeley instead of
# logging them, we don't need to store the pickle data here. Instead,
# we'll write the list of oids referenced by the data, which will be
# useful during _finish()
self._append('x', (oid, vid, nvrevid, '', refdoids, prevrevid))
def write_nonversion_object(self, oid, lrevid, prevrevid, zero='\0'*8):
# Write zeros for the vid and nvrevid since we're storing this object
# into version zero (the non-version). Also, write an empty pickle
# since we'll reuse one already in the pickle table.
self._append('a', (oid, zero, zero, lrevid, None, prevrevid))
def write_moved_object(self, oid, vid, nvrevid, lrevid, prevrevid):
# Write an empty pickle since we're just moving the object and we'll
# reuse the pickle already in the database.
self._append('o', (oid, vid, nvrevid, lrevid, None, prevrevid))
def write_object_undo(self, oid, vid, nvrevid, lrevid, prevrevid):
# Identical to write_moved_object() except that we have to keep some
# extra info around. Specifically, it's possible to undo multiple
# transactions in the same transaction.
self._append('o', (oid, vid, nvrevid, lrevid, None, prevrevid))
self.__prevrevids[oid] = prevrevid
def write_new_version(self, version, vid):
self._append('v', (version, vid))
def write_discard_version(self, vid):
self._append('d', (vid,))
def next(self):
# Get the next object record. Return the key for unpacking and the
# object record data.
rec = self._next()
if rec is None:
return None
try:
key, data = rec
except ValueError:
raise LogCorruptedError, 'incomplete record'
if key not in 'xovdra':
raise LogCorruptedError, 'bad record key: %s' % key
return key, data
This source diff could not be displayed because it is too large. You can view the blob instead.
...@@ -13,236 +13,316 @@ ...@@ -13,236 +13,316 @@
############################################################################## ##############################################################################
"""Berkeley storage without undo or versioning. """Berkeley storage without undo or versioning.
See Full.py for an implementation of Berkeley storage that does support undo
and versioning.
""" """
__version__ = '$Revision: 1.12 $'[-2:][0] __version__ = '$Revision: 1.13 $'[-2:][0]
# This uses the Dunn/Kuchling PyBSDDB v3 extension module available from # This uses the Dunn/Kuchling PyBSDDB v3 extension module available from
# http://pybsddb.sourceforge.net. It is compatible with release 3.0 of # http://pybsddb.sourceforge.net. It is compatible with release 3.4 of
# PyBSDDB3. # PyBSDDB3.
from bsddb3 import db from bsddb3 import db
# BerkeleyBase.BerkeleyBase class provides some common functionality for both # BerkeleyBase class provides some common functionality for BerkeleyDB-based
# the Full and Minimal implementations. It in turn inherits from # storages. It in turn inherits from BaseStorage which itself provides some
# ZODB.BaseStorage.BaseStorage which itself provides some common storage # common storage functionality.
# functionality.
from BerkeleyBase import BerkeleyBase from BerkeleyBase import BerkeleyBase
from CommitLog import PacklessLog
from ZODB import POSException from ZODB import POSException
from ZODB import utils from ZODB.utils import U64, p64
from ZODB.referencesf import referencesf
ABORT = 'A'
COMMIT = 'C'
PRESENT = 'X'
ZERO = '\0'*8
class Minimal(BerkeleyBase): class Minimal(BerkeleyBase):
#
# Overrides of base class methods
#
def _setupDBs(self): def _setupDBs(self):
# Create the tables used to maintain the relevant information. The # Data Type Assumptions:
# minimal storage needs two tables:
# #
# serials -- maps object ids (oids) to object serial numbers. The # - Object ids (oid) are 8-bytes
# serial number is essentially a timestamp used to # - Objects have revisions, with each revision being identified by a
# determine if conflicts have arisen. If an attempt is # unique serial number.
# made to store an object with a serial number that is # - Transaction ids (tid) are 8-bytes
# different than the serial number we already have for # - Data pickles are of arbitrary length
# the object, a ConflictError is raised.
#
# pickles -- maps oids to the object's data pickles.
#
self._serials = self._setupDB('serials')
self._pickles = self._setupDB('pickles')
def _begin(self, tid, u, d, e):
# Begin the current transaction. Currently this just makes sure that
# the commit log is in the proper state.
if self._commitlog is None:
# JF: Chris was getting some weird errors / bizarre behavior from
# Berkeley when using an existing directory or having non-BSDDB
# files in that directory.
self._commitlog = PacklessLog(dir=self._env.db_home)
self._commitlog.start()
def _finish(self, tid, u, d, e):
# This is called from the storage interface's tpc_finish() method.
# Its responsibilities are to finish the transaction with the
# underlying database.
# #
# We have a problem here because tpc_finish() is not supposed to raise # The Minimal storage uses the following tables:
# any exceptions. However because finishing with the backend database
# /can/ cause exceptions, they may be thrown from here as well. If
# that happens, we abort the transaction.
# #
# Because of the locking semantics issue described above, finishing # serials -- {oid -> [serial]}
# the transaction in this case involves: # Maps oids to serial numbers. Each oid can be mapped to 1 or 2
# - starting a transaction with Berkeley DB # serial numbers (this is for optimistic writes). If it maps to
# - replaying our commit log for object updates # two serial numbers, then the current one is determined by the
# - storing those updates in BSDDB # pending flag (see below).
# - committing those changes to BSDDB
# #
# Once the changes are committed successfully to BSDDB, we're done # pickles -- {oid+serial -> pickle}
# with our log file. # Maps the object revisions to the revision's pickle data.
# #
# tid is the transaction id # refcounts -- {oid -> count}
# Maps the oid to the reference count for the object. This
# reference count is updated during the _finish() call. When it
# goes to zero, the object is automatically deleted.
# #
# u is the user associated with the transaction, used for # oids -- [oid]
# auditing, etc. # This is a list of oids of objects that are modified in the
# current uncommitted transaction.
# #
# d is the description of the transaction, arbitrary string, # pending -- tid -> 'A' | 'C'
# but might contain path information # This is an optional flag which says what to do when the database
# is recovering from a crash. The flag is normally 'A' which
# means any pending data should be aborted. At the start of the
# tpc_finish() this flag will be changed to 'C' which means, upon
# recovery/restart, all pending data should be committed. Outside
# of any transaction (e.g. before the tpc_begin()), there will be
# no pending entry. It is a database invariant that if the
# pending table is empty, the oids table must also be empty.
# #
# e is the transaction extension, extra metadata about the self._serials = self._setupDB('serials', db.DB_DUP)
# transaction, such quotas or other custom storage self._pickles = self._setupDB('pickles')
# policies. self._refcounts = self._setupDB('refcounts')
txn = self._env.txn_begin() self._oids = self._setupDB('oids')
self._pending = self._setupDB('pending')
# Do recovery and consistency checks
pendings = self._pending.keys()
assert len(pendings) <= 1
if len(pendings) == 0:
assert len(self._oids) == 0
return
# Do recovery
tid = pendings[0]
flag = self._pending.get(tid)
assert flag in (ABORT, COMMIT)
self._lock_acquire()
try: try:
# BAW: all objects have the same serial number? JF: all the if flag == ABORT:
# existing storages re-use the transaction's serial number for all self._do(self._doabort, tid)
# the objects, but they don't have to. In Jeremy's SimpleStorage,
# it's just a counter. _serial is set in BaseStorage.py during
# tpc_begin().
serial = self._serial
while 1:
rec = self._commitlog.next()
if rec is None:
break
oid, pickle = rec
# Put the object's serial number
self._serials.put(oid, serial, txn)
# Put the object's pickle data
self._pickles.put(oid, pickle, txn)
except:
# If any errors whatsoever occurred, abort the transaction with
# Berkeley, leave the commit log file in the PROMISED state (since
# its changes were never committed), and re-raise the exception.
txn.abort()
raise
else: else:
# Everything is hunky-dory. Commit the Berkeley transaction, and self._do(self._docommit, tid)
# reset the commit log for the next transaction. finally:
txn.commit() self._lock_release()
self._closelog()
def _abort(self):
# Throw away the current transaction. Since we don't have a
# transaction open to Berkeley, what this really means is that we're
# done with our commit log, so we should reset it.
self._closelog()
def close(self): def close(self):
# BAW: the original implementation also deleted these attributes. Was
# that just to reclaim the garbage?
self._serials.close() self._serials.close()
self._pickles.close() self._pickles.close()
# Base class implements some useful close behavior self._refcounts.close()
self._oids.close()
self._pending.close()
BerkeleyBase.close(self) BerkeleyBase.close(self)
# def _do(self, meth, tid):
# Public storage interface txn = self._env.txn_begin()
# try:
meth(tid, txn)
def load(self, oid, version): self._oids.truncate(txn)
"""Return the object pickle and serial number for the object self._pending.truncate(txn)
referenced by object id `oid'. The object is loaded from the back-end except:
storage. txn.abort()
self._docheckpoint()
raise
else:
txn.commit()
self._docheckpoint()
`version' is required by the storage interface, but it is ignored def _doabort(self, tid, txn):
because undo and versions are not supported. co = cs = None
"""
self._lock_acquire()
try: try:
serial = self._serials[oid] co = self._oids.cursor(txn=txn)
pickle = self._pickles[oid] cs = self._serials.cursor(txn=txn)
return pickle, serial rec = co.first()
while rec:
oid = rec[0]
rec = co.next()
try:
cs.set_both(oid, tid)
except db.DBNotFoundError:
pass
else:
cs.delete()
# And delete the pickle table entry for this revision.
self._pickles.delete(oid+tid, txn=txn)
finally: finally:
self._lock_release() # There's a small window of opportunity for leaking a cursor here,
# if co.close() were to fail. In practice this shouldn't happen.
def store(self, oid, serial, data, version, transaction): if co: co.close()
"""Store the object referenced by `oid'. if cs: cs.close()
The object is stored to the transaction named by `transaction', in def _docommit(self, tid, txn):
preparation for the commit or abort of the transaction (i.e. it is not deltas = {}
stored to the underlying database yet). co = cs = None
try:
co = self._oids.cursor(txn=txn)
cs = self._serials.cursor(txn=txn)
rec = co.first()
while rec:
oid = rec[0]
rec = co.next()
# Remove from the serials table all entries with key oid where
# the serial is not tid. These are the old revisions of the
# object. At the same time, we want to collect the oids of
# the objects referred to by this revision's pickle, so that
# later we can decref those reference counts.
srec = cs.set(oid)
while srec:
soid, stid = srec
if soid <> oid:
break
if stid <> tid:
cs.delete()
data = self._pickles.get(oid+stid, txn=txn)
assert data is not None
self._update(deltas, data, -1)
self._pickles.delete(oid+stid, txn=txn)
srec = cs.next_dup()
# Now add incref deltas for all objects referenced by the new
# revision of this object.
data = self._pickles.get(oid+tid, txn=txn)
assert data is not None
self._update(deltas, data, 1)
finally:
# There's a small window of opportunity for leaking a cursor here,
# if co.close() were to fail. In practice this shouldn't happen.
if co: co.close()
if cs: cs.close()
# Now, to finish up, we need apply the refcount deltas to the
# refcounts table, and do recursive collection of all refcount == 0
# objects.
while deltas:
deltas = self._update_refcounts(deltas, txn)
`serial' is the serial number of the object. If it does not match the def _update_refcounts(self, deltas, txn):
stored serial number, a ConflictError is raised. newdeltas = {}
for oid, delta in deltas.items():
rc = U64(self._refcounts.get(oid, ZERO, txn=txn)) + delta
assert rc >= 0
if rc == 0:
# The reference count for this object has just gone to zero,
# so we can safely remove all traces of it from the serials,
# pickles and refcounts table. Note that before we remove its
# pickle, we need to decref all the objects referenced by it.
current = self._getCurrentSerial(oid)
data = self._pickles.get(oid+current, txn=txn)
self._update(newdeltas, data, -1)
# And delete the serials, pickle and refcount entries. At
# this point, I believe we should have just one serial entry.
self._serials.delete(oid, txn=txn)
assert self._serials.get(oid, txn=txn) is None
self._refcounts.delete(oid, txn=txn)
self._pickles.delete(oid+current, txn=txn)
else:
self._refcounts.put(oid, p64(rc), txn=txn)
# Return the list of objects referenced by pickles just deleted in
# this round, for decref'ing on the next go 'round.
return newdeltas
`data' is object's data pickle. def _begin(self, tid, u, d, e):
# When a transaction begins, we set the pending flag to ABORT,
# meaning, if we crash between now and the time we vote, all changes
# will be aborted.
self._pending[self._serial] = ABORT
`version' is required by the storage interface, but it must be set to def store(self, oid, serial, data, version, transaction):
None because undo and versions are not supported.
"""
if transaction is not self._transaction: if transaction is not self._transaction:
raise POSException.StorageTransactionError(self, transaction) raise POSException.StorageTransactionError(self, transaction)
# We don't support versions
# Make sure the version is false. Usually it's an empty string, but if version <> '':
# we have to make sure. raise POSException.Unsupported, 'versions are not supported'
if version: # All updates must be done with the application lock acquired
raise POSException.Unsupported, "Versions aren't supported"
self._lock_acquire() self._lock_acquire()
try: try:
oserial = self._serials.get(oid) oserial = self._getCurrentSerial(oid)
if oserial is not None and serial <> oserial: if oserial is not None and serial <> oserial:
# The object exists in the database, but the serial number # The object exists in the database, but the serial number
# given in the call is not the same as the last stored serial # given in the call is not the same as the last stored serial
# number. Raise a ConflictError. # number. Raise a ConflictError.
raise POSException.ConflictError( #
serials=(oserial, serial)) # BAW: do application level conflict resolution
# Our serial number is updated in BaseStorage's tpc_begin() call, raise POSException.ConflictError(serials=(oserial, serial))
# which sets the serial number to the current timestamp. # Optimistically write to the serials and pickles table. Be sure
serial = self._serial # to also update the oids table for this object too.
# Write the object's pickle data to the commit log file newserial = self._serial
self._commitlog.write_object(oid, data) txn = self._env.txn_begin()
try:
self._serials.put(oid, newserial, txn=txn)
self._pickles.put(oid+newserial, data, txn=txn)
self._oids.put(oid, PRESENT, txn=txn)
except:
txn.abort()
self._docheckpoint()
raise
else:
txn.commit()
self._docheckpoint()
finally: finally:
self._lock_release() self._lock_release()
# Return our cached serial number for the object # Return the new serial number for the object
return serial return newserial
def pack(self, t, getrefsfunc): def _finish(self, tid, u, d, e):
"""Pack the storage. # Twiddle the pending flag to COMMIT now since after the vote call, we
# promise that the changes will be committed, no matter what. The
# recovery process will check this.
self._pending[self._serial] = COMMIT
self._do(self._docommit, self._serial)
Since this storage does not support versions, packing serves only to def _abort(self):
remove any objects that are not referenced from the root of the tree self._do(self._doabort, self._serial)
(i.e. they are garbage collected).
BAW: where are `t' and `getrefsfunc' defined in the model? And #
exactly what are their purpose and semantics? # Accessor interface
""" #
self._lock_acquire()
def _getCurrentSerial(self, oid):
# BAW: We must have the application level lock here.
c = self._serials.cursor()
try: try:
# Build an index only of those objects reachable from the root. # There can be zero, one, or two entries in the serials table for
# Unfortunately, we do this in memory, so the memory footprint of # this oid. If there are no entries, raise a KeyError (we know
# packing may still be substantial. # nothing about this object).
# #
# Known root objects are kept in this list and as new ones are # If there is exactly one entry then this has to be the entry for
# found, their oids are pushed onto the front of the list. It is # the object, regardless of the pending flag.
# also added to the seen dictionary, which keeps track of objects #
# we've seen already. When roots is empty, we're done visiting # If there are two entries, then we need to look at the pending
# all the objects. # flag to decide which to return (there /better/ be a pending flag
roots = ['\0\0\0\0\0\0\0\0'] # set!). If the pending flag is COMMIT then we've already voted
seen = {} # so the second one is the good one. If the pending flag is ABORT
while roots: # then we haven't yet committed to this transaction so the first
# Get the next oid from the roots list # one is the good one.
oid = roots.pop() serials = []
# Skip it if we've already seen it try:
if seen.has_key(oid): rec = c.set(oid)
continue except db.DBNotFoundError:
# Get the pickle corresponding to the object id and scan it rec = None
# for references to other objects. This is done by the while rec:
# magical `getrefsfunc' function given as an argument. serials.append(rec[1])
pickle = self._pickles[oid] rec = c.next_dup()
seen[oid] = 1 if not serials:
# This will prepend any new oids we'll need to scan return None
getrefsfunc(pickle, roots) if len(serials) == 1:
# Now, go through every oid for which we have a pickle, and if we return serials[0]
# have not seen it, then it must be garbage (because it was never pending = self._pending.get(self._serial)
# reached from one of the roots). In that case, delete its entry assert pending in (ABORT, COMMIT)
# in the pickle index. if pending == ABORT:
for oid in self._pickles.keys(): return serials[0]
if not seen.has_key(oid): return serials[1]
del self._pickles[oid] finally:
c.close()
def load(self, oid, version):
if version <> '':
raise POSException.Unsupported, 'versions are not supported'
self._lock_acquire()
try:
# Get the current serial number for this object
serial = self._getCurrentSerial(oid)
if serial is None:
raise KeyError, 'Object does not exist: %r' % oid
# Get this revision's pickle data
return self._pickles[oid+serial], serial
finally: finally:
self._lock_release() self._lock_release()
def modifiedInVersion(self, oid):
# So BaseStorage.getSerial() just works. Note that this storage
# doesn't support versions.
return ''
...@@ -23,31 +23,35 @@ DBHOME = 'test-db' ...@@ -23,31 +23,35 @@ DBHOME = 'test-db'
class BerkeleyTestBase(StorageTestBase): class BerkeleyTestBase(StorageTestBase):
def _zap_dbhome(self): def _zap_dbhome(self, dir):
# If the tests exited with any uncommitted objects, they'll blow up # If the tests exited with any uncommitted objects, they'll blow up
# subsequent tests because the next transaction commit will try to # subsequent tests because the next transaction commit will try to
# commit those object. But they're tied to closed databases, so # commit those object. But they're tied to closed databases, so
# that's broken. Aborting the transaction now saves us the headache. # that's broken. Aborting the transaction now saves us the headache.
try: try:
for file in os.listdir(DBHOME): for file in os.listdir(dir):
os.unlink(os.path.join(DBHOME, file)) os.unlink(os.path.join(dir, file))
os.removedirs(DBHOME) os.removedirs(dir)
except OSError, e: except OSError, e:
if e.errno <> errno.ENOENT: raise if e.errno <> errno.ENOENT:
raise
def setUp(self): def _mk_dbhome(self, dir):
StorageTestBase.setUp(self) os.mkdir(dir)
self._zap_dbhome()
os.mkdir(DBHOME)
try: try:
self._storage = self.ConcreteStorage(DBHOME) return self.ConcreteStorage(dir)
except: except:
self._zap_dbhome() self._zap_dbhome(dir)
raise raise
def setUp(self):
StorageTestBase.setUp(self)
self._zap_dbhome(DBHOME)
self._storage = self._mk_dbhome(DBHOME)
def tearDown(self): def tearDown(self):
StorageTestBase.tearDown(self) StorageTestBase.tearDown(self)
self._zap_dbhome() self._zap_dbhome(DBHOME)
......
...@@ -27,8 +27,8 @@ DBHOME = 'test-db' ...@@ -27,8 +27,8 @@ DBHOME = 'test-db'
class ZODBTestBase(BerkeleyTestBase): class ZODBTestBase(BerkeleyTestBase):
def setUp(self): def setUp(self):
BerkeleyTestBase.setUp(self) BerkeleyTestBase.setUp(self)
self._db = None
try: try:
self._storage = self.ConcreteStorage(DBHOME)
self._db = DB(self._storage) self._db = DB(self._storage)
self._conn = self._db.open() self._conn = self._db.open()
self._root = self._conn.root() self._root = self._conn.root()
......
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
# Test the operation of the CommitLog classes
import os
import errno
import unittest
from bsddb3Storage import CommitLog
# BAW: Lots of other things to check:
# - creating with a named directory
# - creating with an existing file via filename
# - creating with a file object with # incorrect mode or permissions
# - creating with a file object raising the two flavors of LogCorruptedError
# - The various forms of LogCorruptedError in PacklessLog.next()
class CreateCommitLogTest(unittest.TestCase):
def checkCreateNoFile(self):
unless = self.failUnless
log = CommitLog.CommitLog()
filename = log.get_filename()
try:
unless(os.path.exists(filename))
finally:
log.close(unlink=1)
unless(not os.path.exists(filename))
def checkCreateWithFilename(self):
unless = self.failUnless
filename = 'commit.log'
log = CommitLog.CommitLog(filename)
try:
unless(os.path.exists(filename))
finally:
log.close(unlink=1)
unless(not os.path.exists(filename))
def checkCreateWithFileobj(self):
filename = 'commit.log'
fp = open(filename, 'w+b')
try:
self.assertRaises(CommitLog.TruncationError,
CommitLog.CommitLog, fp)
finally:
fp.close()
self.failUnless(not os.path.exists(filename))
def checkCloseDoesUnlink(self):
log = CommitLog.CommitLog()
filename = log.get_filename()
log.close()
self.failUnless(not os.path.exists(filename))
def checkDel(self):
log = CommitLog.CommitLog()
filename = log.get_filename()
del log
self.failUnless(not os.path.exists(filename))
class BaseSetupTearDown(unittest.TestCase):
def setUp(self):
self._log = CommitLog.CommitLog()
def tearDown(self):
try:
self._log.close(unlink=1)
except OSError, e:
if e.errno <> errno.ENOENT: raise
class CommitLogStateTransitionTest(BaseSetupTearDown):
def checkProperStart(self):
# BAW: best we can do is make sure we can start a new commit log
self._log.start()
def checkAppendSetsOpen(self):
# BAW: Best we can do is assert that the state isn't START
self._log._append('x', 'ignore')
self.assertRaises(CommitLog.StateTransitionError, self._log.start)
def checkPromiseSetsPromise(self):
# BAW: Best we can do is assert that state isn't START
self._log.promise()
self.assertRaises(CommitLog.StateTransitionError, self._log.start)
def checkBadDoublePromise(self):
self._log.promise()
self.assertRaises(CommitLog.StateTransitionError, self._log.promise)
def checkFinishSetsStart(self):
self._log.finish()
# BAW: best we can do is make sure we can start a new commit log
self._log.start()
# Wouldn't it be nice to have generators? :)
class Gen:
def __init__(self):
self.__counter = 0
def __call__(self):
try:
return self[self.__counter]
finally:
self.__counter = self.__counter + 1
def __getitem__(self, i):
if 0 <= i < 10:
return chr(i+65), i
raise IndexError
class LowLevelStoreAndLoadTest(BaseSetupTearDown):
def checkOneStoreAndLoad(self):
eq = self.assertEqual
self._log.start()
self._log._append('x', 'ignore')
self._log.promise()
x, ignore = self._log._next()
eq(x, 'x')
eq(ignore, 'ignore')
eq(self._log._next(), None)
def checkTenStoresAndLoads(self):
eq = self.assertEqual
self._log.start()
for k, v in Gen():
self._log._append(k, v)
self._log.promise()
g = Gen()
while 1:
rec = self._log._next()
if rec is None:
break
c, i = g()
eq(rec[0], c)
eq(rec[1], i)
self.assertRaises(IndexError, g)
class PacklessLogTest(BaseSetupTearDown):
def setUp(self):
self._log = CommitLog.PacklessLog()
self._log.start()
def checkOneStoreAndLoad(self):
eq = self.assertEqual
self._log.write_object(oid=10, pickle='ignore')
self._log.promise()
oid, pickle = self._log.next()
eq(oid, 10)
eq(pickle, 'ignore')
eq(self._log.next(), None)
def checkTenStoresAndLoads(self):
eq = self.assertEqual
for k, v in Gen():
self._log.write_object(v, k*10)
self._log.promise()
g = Gen()
while 1:
rec = self._log.next()
if rec is None:
break
c, i = g()
oid, pickle = rec
eq(oid, i)
eq(pickle, c*10)
self.assertRaises(IndexError, g)
class FullLogTest(BaseSetupTearDown):
def setUp(self):
self._log = CommitLog.FullLog()
self._log.start()
def checkOneStoreAndLoad(self):
eq = self.assertEqual
oid = 10
vid = 8
nvrevid = 0
pickle = 'ignore'
prevrevid = 9
self._log.write_object(oid, vid, nvrevid, pickle, prevrevid)
self._log.promise()
rec = self._log.next()
self.failUnless(rec)
key, rec = rec
eq(key, 'x')
eq(len(rec), 6)
eq(rec, (oid, vid, nvrevid, '', pickle, prevrevid))
eq(self._log.next(), None)
def checkOtherWriteMethods(self):
eq = self.assertEqual
unless = self.failUnless
oid = 10
vid = 1
nvrevid = 0
lrevid = 8
pickle = 'ignore'
prevrevid = 9
version = 'new-version'
zero = '\0'*8
self._log.write_nonversion_object(oid, lrevid, prevrevid)
self._log.write_moved_object(oid, vid, nvrevid, lrevid, prevrevid)
self._log.write_new_version(version, vid)
self._log.write_discard_version(vid)
self._log.promise()
rec = self._log.next()
unless(rec)
key, rec = rec
eq(key, 'a')
eq(len(rec), 6)
eq(rec, (oid, zero, zero, lrevid, None, prevrevid))
rec = self._log.next()
unless(rec)
key, rec = rec
eq(key, 'o')
eq(len(rec), 6)
eq(rec, (oid, vid, nvrevid, lrevid, None, prevrevid))
rec = self._log.next()
unless(rec)
key, rec = rec
eq(key, 'v')
eq(len(rec), 2)
eq(rec, (version, vid))
rec = self._log.next()
unless(rec)
key, rec = rec
eq(key, 'd')
eq(len(rec), 1)
eq(rec, (vid,))
def test_suite():
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(CreateCommitLogTest, 'check'))
suite.addTest(unittest.makeSuite(CommitLogStateTransitionTest, 'check'))
suite.addTest(unittest.makeSuite(LowLevelStoreAndLoadTest, 'check'))
suite.addTest(unittest.makeSuite(PacklessLogTest, 'check'))
suite.addTest(unittest.makeSuite(FullLogTest, 'check'))
return suite
if __name__ == '__main__':
unittest.main(defaultTest='test_suite')
...@@ -30,7 +30,8 @@ from ZODB.tests.TransactionalUndoVersionStorage import \ ...@@ -30,7 +30,8 @@ from ZODB.tests.TransactionalUndoVersionStorage import \
TransactionalUndoVersionStorage TransactionalUndoVersionStorage
from ZODB.tests.PackableStorage import PackableStorage from ZODB.tests.PackableStorage import PackableStorage
from ZODB.tests.HistoryStorage import HistoryStorage from ZODB.tests.HistoryStorage import HistoryStorage
from ZODB.tests.IteratorStorage import IteratorStorage from ZODB.tests.IteratorStorage import IteratorStorage, ExtendedIteratorStorage
from ZODB.tests.RecoveryStorage import RecoveryStorage
from ZODB.tests import ConflictResolution from ZODB.tests import ConflictResolution
...@@ -47,17 +48,28 @@ class MinimalTest(BerkeleyTestBase.MinimalTestBase, BasicStorage): ...@@ -47,17 +48,28 @@ class MinimalTest(BerkeleyTestBase.MinimalTestBase, BasicStorage):
class FullTest(BerkeleyTestBase.FullTestBase, BasicStorage, class FullTest(BerkeleyTestBase.FullTestBase, BasicStorage,
RevisionStorage, VersionStorage, RevisionStorage, VersionStorage,
TransactionalUndoStorage, TransactionalUndoStorage,
TransactionalUndoVersionStorage, PackableStorage, TransactionalUndoVersionStorage,
HistoryStorage, IteratorStorage, PackableStorage,
HistoryStorage,
IteratorStorage, ExtendedIteratorStorage,
ConflictResolution.ConflictResolvingStorage, ConflictResolution.ConflictResolvingStorage,
ConflictResolution.ConflictResolvingTransUndoStorage): ConflictResolution.ConflictResolvingTransUndoStorage):
pass
DST_DBHOME = 'test-dst'
# BAW: This test fails, it should be fixed. class FullRecoveryTest(BerkeleyTestBase.FullTestBase,
# DBNotFoundError: (-30990, 'DB_NOTFOUND: No matching key/data pair found') RecoveryStorage):
def checkVersionIterator(self): def setUp(self):
import sys BerkeleyTestBase.FullTestBase.setUp(self)
print >> sys.stderr, \ self._zap_dbhome(DST_DBHOME)
'FullTest.checkVersionIterator() temporarily disabled.' self._dst = self._mk_dbhome(DST_DBHOME)
def tearDown(self):
BerkeleyTestBase.FullTestBase.tearDown(self)
self._zap_dbhome(DST_DBHOME)
def checkTransactionalUndoAfterPackWithObjectUnlinkFromRoot(self): def checkTransactionalUndoAfterPackWithObjectUnlinkFromRoot(self):
...@@ -78,9 +90,10 @@ class AutopackTest(BerkeleyTestBase.AutopackTestBase, BasicStorage): ...@@ -78,9 +90,10 @@ class AutopackTest(BerkeleyTestBase.AutopackTestBase, BasicStorage):
def test_suite(): def test_suite():
suite = unittest.TestSuite() suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(MinimalTest, 'check'))
suite.addTest(unittest.makeSuite(FullTest, 'check')) suite.addTest(unittest.makeSuite(FullTest, 'check'))
suite.addTest(unittest.makeSuite(AutopackTest, 'check')) suite.addTest(unittest.makeSuite(FullRecoveryTest, 'check'))
suite.addTest(unittest.makeSuite(MinimalTest, 'check'))
#suite.addTest(unittest.makeSuite(AutopackTest, 'check'))
return suite return suite
......
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
# Whitebox testing of storage implementation details.
import unittest
from ZODB.utils import U64
from ZODB.tests.MinPO import MinPO
from ZODB.tests.StorageTestBase import zodb_unpickle
from bsddb3Storage.Minimal import Minimal
from bsddb3Storage.Full import Full
from bsddb3Storage.tests.BerkeleyTestBase import BerkeleyTestBase
from bsddb3Storage.tests.ZODBTestBase import ZODBTestBase
from Persistence import Persistent
ZERO = '\0'*8
class Object(Persistent):
pass
class WhiteboxLowLevelMinimal(BerkeleyTestBase):
ConcreteStorage = Minimal
def checkTableConsistencyAfterCommit(self):
unless = self.failIf
eq = self.assertEqual
oid = self._storage.new_oid()
revid1 = self._dostore(oid, data=11)
revid2 = self._dostore(oid, revid=revid1, data=12)
revid3 = self._dostore(oid, revid=revid2, data=13)
# First off, there should be no entries in the pending table
unless(self._storage._pending.keys())
# Also, there should be no entries in the oids table
unless(self._storage._oids.keys())
# Now, there should be exactly one oid in the serials table, and
# exactly one record for that oid in the table too.
oids = {}
c = self._storage._serials.cursor()
try:
rec = c.first()
while rec:
oid, serial = rec
oids.setdefault(oid, []).append(serial)
rec = c.next()
finally:
c.close()
eq(len(oids), 1)
eq(len(oids[oids.keys()[0]]), 1)
# There should now be exactly one entry in the pickles table.
pickles = self._storage._pickles.items()
eq(len(pickles), 1)
key, data = pickles[0]
poid = key[:8]
pserial = key[8:]
eq(oid, poid)
eq(revid3, pserial)
obj = zodb_unpickle(data)
eq(obj.value, 13)
# Now verify the refcounts table, which should be empty because the
# stored object isn't referenced by any other objects.
eq(len(self._storage._refcounts.keys()), 0)
class WhiteboxHighLevelMinimal(ZODBTestBase):
ConcreteStorage = Minimal
def checkReferenceCounting(self):
eq = self.assertEqual
obj = MinPO(11)
self._root.obj = obj
get_transaction().commit()
obj.value = 12
get_transaction().commit()
obj.value = 13
get_transaction().commit()
# Make sure the databases have what we expect
eq(len(self._storage._serials.items()), 2)
eq(len(self._storage._pickles.items()), 2)
# And now refcount out the object
del self._root.obj
get_transaction().commit()
# Verification stage. Our serials table should have exactly one
# entry, oid == 0
keys = self._storage._serials.keys()
eq(len(keys), 1)
eq(len(self._storage._serials.items()), 1)
eq(keys[0], ZERO)
# The pickles table now should have exactly one revision of the root
# object, and no revisions of the MinPO object, which should have been
# collected away.
pickles = self._storage._pickles.items()
eq(len(pickles), 1)
rec = pickles[0]
key = rec[0]
data = rec[1]
eq(key[:8], ZERO)
# And that pickle should have no 'obj' attribute.
unobj = zodb_unpickle(data)
self.failIf(hasattr(unobj, 'obj'))
# Our refcounts table should have no entries in it, because the root
# object is an island.
eq(len(self._storage._refcounts.keys()), 0)
# And of course, oids and pendings should be empty too
eq(len(self._storage._oids.keys()), 0)
eq(len(self._storage._pending.keys()), 0)
def checkRecursiveReferenceCounting(self):
eq = self.assertEqual
obj1 = Object()
obj2 = Object()
obj3 = Object()
obj4 = Object()
self._root.obj = obj1
obj1.obj = obj2
obj2.obj = obj3
obj3.obj = obj4
get_transaction().commit()
# Make sure the databases have what we expect
eq(len(self._storage._serials.items()), 5)
eq(len(self._storage._pickles.items()), 5)
# And now refcount out the object
del self._root.obj
get_transaction().commit()
# Verification stage. Our serials table should have exactly one
# entry, oid == 0
keys = self._storage._serials.keys()
eq(len(keys), 1)
eq(len(self._storage._serials.items()), 1)
eq(keys[0], ZERO)
# The pickles table now should have exactly one revision of the root
# object, and no revisions of any other objects, which should have
# been collected away.
pickles = self._storage._pickles.items()
eq(len(pickles), 1)
rec = pickles[0]
key = rec[0]
data = rec[1]
eq(key[:8], ZERO)
# And that pickle should have no 'obj' attribute.
unobj = zodb_unpickle(data)
self.failIf(hasattr(unobj, 'obj'))
# Our refcounts table should have no entries in it, because the root
# object is an island.
eq(len(self._storage._refcounts.keys()), 0)
# And of course, oids and pendings should be empty too
eq(len(self._storage._oids.keys()), 0)
eq(len(self._storage._pending.keys()), 0)
class WhiteboxHighLevelFull(ZODBTestBase):
ConcreteStorage = Full
def checkReferenceCounting(self):
eq = self.assertEqual
# Make sure the databases have what we expect
eq(len(self._storage._serials.items()), 1)
eq(len(self._storage._pickles.items()), 1)
# Now store an object
obj = MinPO(11)
self._root.obj = obj
get_transaction().commit()
# Make sure the databases have what we expect
eq(len(self._storage._serials.items()), 2)
eq(len(self._storage._pickles.items()), 3)
obj.value = 12
get_transaction().commit()
# Make sure the databases have what we expect
eq(len(self._storage._serials.items()), 2)
eq(len(self._storage._pickles.items()), 4)
obj.value = 13
get_transaction().commit()
# Make sure the databases have what we expect
eq(len(self._storage._serials.items()), 2)
eq(len(self._storage._pickles.items()), 5)
# And now refcount out the object
del self._root.obj
get_transaction().commit()
# Verification stage. Our serials tabl should still have 2 entries,
# one for the root object and one for the now unlinked MinPO obj.
keys = self._storage._serials.keys()
eq(len(keys), 2)
eq(len(self._storage._serials.items()), 2)
eq(keys[0], ZERO)
# The pickles table should now have 6 entries, broken down like so:
# - 3 revisions of the root object: the initial database-open
# revision, the revision that got its obj attribute set, and the
# revision that got its obj attribute deleted.
# - 3 Three revisions of obj, corresponding to values 11, 12, and 13
pickles = self._storage._pickles.items()
eq(len(pickles), 6)
# Our refcounts table should have one entry in it for the MinPO that's
# referenced in an earlier revision of the root object
eq(len(self._storage._refcounts.keys()), 1)
# And of course, oids and pendings should be empty too
eq(len(self._storage._oids.keys()), 0)
eq(len(self._storage._pending.keys()), 0)
def test_suite():
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(WhiteboxLowLevelMinimal, 'check'))
suite.addTest(unittest.makeSuite(WhiteboxHighLevelMinimal, 'check'))
suite.addTest(unittest.makeSuite(WhiteboxHighLevelFull, 'check'))
return suite
if __name__ == '__main__':
unittest.main(defaultTest='test_suite')
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment