Commit 1f28c122 authored by Barry Warsaw's avatar Barry Warsaw

BerkeleyBase.py -- contains a rewritten base class for both the Full

and Minimal Berkeley storages.  It derives from
ZODB.BaseStorage.BaseStorage and supports the tpc_vote() method
wrapper calling _vote() on the derived class.

CommitLog.py -- contains the temporary transaction commit log file
classes used to hold transaction changes until they can be committed
to Berkeley.
parent 73e8930f
"""Base class for BerkeleyStorage implementations.
"""
import os
import errno
from types import StringType
# This uses the Dunn/Kuchling PyBSDDB v3 extension module available from
# http://pybsddb.sourceforge.net
from bsddb3 import db
# BaseStorage provides some common storage functionality. It is derived from
# UndoLogCompatible.UndoLogCompatible, which "[provides] backward
# compatability with storages that have undoLog, but not undoInfo."
#
# BAW: I'm not entirely sure what that means, but the UndoLogCompatible
# subclass provides one method:
#
# undoInfo(first, last, specification). Unfortunately this method appears to
# be undocumented. Jeremy tells me it's still required though.
#
# BaseStorage provides primitives for lock acquisition and release,
# abortVersion(), commitVersion() and a host of other methods, some of which
# are overridden here, some of which are not.
from ZODB.BaseStorage import BaseStorage
__version__ ='$Revision: 1.1 $'[11:-2]
class BerkeleyBase(BaseStorage):
"""Base storage for Minimal and Full Berkeley implementations."""
def __init__(self, name, env=None, prefix="zodb_"):
"""Create a new storage.
name is an arbitrary name for this storage. It is returned by the
getName() method.
env is the database environment name, used to handle more advanced
BSDDB functionality such as transactions. If env is a non-empty
string, it is passed directly to DbEnv().open(), which in turn is
passed to the BSDDB function DBEnv->open() as the db_home parameter.
If env is not a string, it must be an already existing DbEnv()
object.
prefix is the string to prepend to name when passed to DB.open() as
the dbname parameter. IOW, prefix+name is passed to the BSDDB
function DB->open() as the database parameter. It defaults to
"zodb_".
"""
# sanity check arguments
if name == '':
raise TypeError, 'database name is empty'
if env is None:
env = name
if isinstance(env, StringType):
if env == '':
raise TypeError, 'environment name is empty'
env = env_from_string(env)
elif not isinstance(env, db.DBEnv):
raise TypeError, 'env must be a string or DBEnv instance: %s' % env
BaseStorage.__init__(self, name)
# Initialize a few other things
self._env = env
self._prefix = prefix
self._commitlog = None
# Give the subclasses a chance to interpose into the database setup
# procedure
self._setupDBs()
# Initialize the object id counter.
self._init_oid()
def _setupDB(self, name, flags=0):
"""Open an individual database with the given flags.
flags are passed directly to the underlying DB.set_flags() call.
"""
d = db.DB(self._env)
if flags:
d.set_flags(flags)
# Our storage is based on the underlying BSDDB btree database type.
d.open(self._prefix + name, db.DB_BTREE, db.DB_CREATE)
return d
def _setupDBs(self):
"""Set up the storages databases, typically using '_setupDB'.
This must be implemented in a subclass.
"""
raise NotImplementedError, '_setupDbs()'
def _init_oid(self):
"""Initialize the object id counter."""
# If the `serials' database is non-empty, the last object id in the
# database will be returned (as a [key, value] pair). Use it to
# initialize the object id counter.
#
# If the database is empty, just initialize it to zero.
value = self._serials.cursor().last()
if value:
self._oid = value[0]
else:
self._oid = '\0\0\0\0\0\0\0\0'
# It can be very expensive to calculate the "length" of the database, so
# we cache the length and adjust it as we add and remove objects.
_len = None
def __len__(self):
"""Return the number of objects in the index."""
if self._len is None:
# The cache has never been initialized. Do it once the expensive
# way.
self._len = len(self._serials)
return self._len
def new_oid(self, last=None):
"""Create a new object id.
If last is provided, the new oid will be one greater than that.
"""
# BAW: the last parameter is undocumented in the UML model
if self._len is not None:
# Increment the cached length
self._len = self._len + 1
return BaseStorage.new_oid(self, last)
def getSize(self):
"""Return the size of the database."""
# TBD: this is expensive to calculate and many not be necessary.
return 0
def tpc_vote(self, transaction):
# BAW: This wrapper framework should probably be in BaseStorage's
# tpc_vote()
self._lock_acquire()
try:
self._vote(transaction)
finally:
self._lock_release()
def _vote(self, transaction):
pass
def _finish(self, tid, user, desc, ext):
"""Called from BaseStorage.tpc_finish(), this commits the underlying
BSDDB transaction.
tid is the transaction id
user is the transaction user
desc is the transaction description
ext is the transaction extension
These are all ignored.
"""
self._txn.commit()
def _abort(self, tid, user, desc, ext):
"""Called from BaseStorage.tpc_abort(), this aborts the underlying
BSDDB transaction.
tid is the transaction id
user is the transaction user
desc is the transaction description
ext is the transaction extension
These are all ignored.
"""
# BAW: this appears to be broken. Look in BaseStorage.tpc_abort();
# _abort() is never called with any arguments. :/
self._txn.abort()
def _clear_temp(self):
"""Called from BaseStorage.tpc_abort(), BaseStorage.tpc_begin(),
BaseStorage.tpc_finish(), this clears out the temporary log file
"""
# BAW: no-op this since the right CommitLog file operations are
# performed by the methods in the derived storage class.
pass
def close(self):
"""Close the storage by closing the databases it uses and by closing
its environment.
"""
self._env.close()
# BAW: the original implementation also deleted the _env attribute.
# Was this just to reclaim the garbage?
def env_from_string(envname):
# BSDDB requires that the directory already exists. BAW: do we need to
# adjust umask to ensure filesystem permissions?
try:
os.mkdir(envname)
except OSError, e:
if e.errno <> errno.EEXIST: raise
# already exists
env = db.DBEnv()
env.open(envname,
db.DB_CREATE # create underlying files as necessary
| db.DB_RECOVER # run normal recovery before opening
| db.DB_INIT_MPOOL # initialize shared memory buffer pool
| db.DB_INIT_LOCK # initialize locking subsystem
| db.DB_INIT_TXN # initialize transaction subsystem
)
return env
# CommitLog class
#
# This class implements the action log for writes to non-committed
# transactions. They are replayed and applied all at once during _finish()
# which is called by BaseStorage's tpc_finish(). See FullImplementation.txt
# and notes for some discussion of the issues involved.
#
# BAW: understand this more, and figure out why we can't use BSDDB's
# lock_detect().
#
# File format:
#
# The log file consists of a standard header, followed by a number of marshal
# records. Each marshal consists of a single character opcode and an
# argument. The specific opcodes and arguments depend on the type of storage
# using the CommitLog instance, and derived classes provide a more specific
# interface for the storage.
import sha
import struct
import os
import time
import marshal
import errno
from types import StringType
# JF: POSError is the ZODB version of Exception; it's fairly generic
# so a more specific exception might be better. E.g. StorageError
from ZODB.POSException import POSError
# Log file states.
#
# START is the transaction start state, and the log file must either not exist
# or be in the COMMITTED state in order to enter the START state.
START = 'S'
# OPEN state is where objects have begun to be stored into the log file during
# an open transaction. OPEN can only be entered from the START state, and
# upon the first object store, the state goes from START->OPEN. From here the
# transaction will either be committed or aborted.
OPEN = 'O'
# If the transaction is aborted, everything is discarded and the commit log is
# moved to the START state. This can only happen from the START or OPEN
# state. If the transaction is finished, then we are guaranteeing that the
# stored objects will be saved to the back-end storage. In that case we
# change the state to PROMISED and allow the storage to read the objects out
# again.
#
# Once the transaction commit has succeeded, the file's state is moved back to
# START for the next transaction.
PROMISED = 'P'
# Magic number for log file header
MAGIC = 0xfeedb00bL
# Version number
SCHEMA = 0x01
# The format of the file header. It consists of:
# - a 32 bit magic number
# - the 16-bit schema number
# - the single character commit log state flag
FMT = '>IHc'
FMTSZ = struct.calcsize(FMT)
class CommitLogError(POSError):
"""Base class for errors raised by the CommitLog class."""
class TruncationError(CommitLogError):
"""A log file truncation was detected on a header read."""
class LogCorruptedError(CommitLogError):
"""A read of a data record was incomplete or corrupted."""
class StateTransitionError(CommitLogError):
"""An illegal state transition was attempted."""
def __init__(self, state):
self.__state = state
def __str__(self):
return 'at invalid state: %c' % self.__state
class UncommittedChangesError(StateTransitionError):
"""The commit log has uncommitted changes.
This exception indicates that a promised commit of object states was not
either aborted or finished committing. No further object updates will be
allowed until the log file is replayed and explicitly cleared.
"""
class CommitLog:
def __init__(self, file=None, dir='.'):
"""Initialize the commit log, usually with a new file.
This is not a real temporary file because if we crash before the
transaction is committed, we'll need to replay this log. However, we
also need to be especially careful about the mode this file is written
in, otherwise evil processes could snoop information.
If `file' is provided it must be an already open file-like object that
supports seek() and appending r/w in binary mode.
Or `file' can be the name of a file which will be created with those
semantics. If `file' is omitted, we create a new such file, from a
(hopefully uniquely) crafted filename. In either of these two cases,
the filename is relative to dir (the default is the current
directory).
The commit file has a header consisting of the following information:
- a 32 bit magic number
- the 16-bit schema number
- the single character commit log state flag
Integers are standard size, big-endian.
"""
# BAW: is our filename unique enough? Are we opening it up with too
# much or too little security?
if file is None:
# Create the file from scratch. We know the file has to be in the
# init state, so just go ahead and write the appropriate header.
now = time.time()
pid = os.getpid()
file = sha.new(`now` + `pid`).hexdigest()
# BAW: what directory to create this in? /tmp doesn't seem right.
omask = os.umask(077) # -rw-------
try:
try:
os.makedirs(dir)
except OSError, e:
if e.errno <> errno.EEXIST: raise
self._fp = open(os.path.join(dir, file), 'w+b')
finally:
os.umask(omask)
self._writehead(START)
elif isinstance(file, StringType):
# Open the file in the proper mode. If it doesn't exist, write
# the start header.
omask = os.umask(077)
try:
try:
os.makedirs(dir)
except OSError, e:
if e.errno <> errno.EEXIST: raise
self._fp = open(os.path.join(dir, file), 'w+b')
finally:
os.umask(omask)
# Attempt to read any existing header. If we get an error, assume
# the file was created from scratch and write the start header.
try:
self._readhead()
except TruncationError:
self._writehead(START)
else:
# File object was created externally; maybe we're replaying an old
# log. Read the file's header and initialize our state from it.
self._fp = file
self._readhead()
def get_filename(self):
return self._fp.name
def _writehead(self, state, pack=struct.pack):
# Scribble a new header onto the front of the file. state is the
# 1-character state flag.
assert len(state) == 1
self._state = state
data = pack(FMT, MAGIC, SCHEMA, state)
pos = self._fp.tell()
self._fp.seek(0)
self._fp.write(data)
self._fp.flush()
# Seek to the old file position, or just past the header, whichever is
# farther away.
self._fp.seek(max(self._fp.tell(), pos))
def _readhead(self, unpack=struct.unpack):
# Read the current file header, and return a tuple of the header data.
# If the file for some reason doesn't contain a complete header, a
# TruncationError is raised.
pos = self._fp.tell()
self._fp.seek(0)
header = self._fp.read(FMTSZ)
if len(header) <> FMTSZ:
raise TruncationError('short header read: %d bytes' % len(header))
try:
magic, schema, state = unpack(FMT, header)
except struct.error, e:
raise LogCorruptedError, e
if magic <> MAGIC:
raise LogCorruptedError, 'bad magic number: %x' % magic
#
# for now there's no backwards compatibility necessary
if schema <> SCHEMA:
raise LogCorruptedError, 'bad version number: %d' % schema
self._state = state
# See to the old file position, or just past the header, whichever is
# farther away.
self._fp.seek(max(self._fp.tell(), pos))
def _append(self, key, record, dump=marshal.dump):
# Store the next record in the file. Key is a single character
# marking the record type. Record must be a tuple of some record-type
# specific data. Record types and higher level write methods are
# defined in derived classes.
assert len(key) == 1
# Make assertions about the file's state
assert self._state in (START, OPEN, PROMISED)
if self._state == START:
self._writehead(OPEN)
elif self._state == OPEN:
pass
elif self._state == PROMISED:
raise UncommittedChangesError(
'Cannot write while promised updates remain uncommitted')
# We're good to go, append the object
self._fp.seek(0, 2) # to end of file
if self._fp.tell() < FMTSZ:
raise TruncationError, 'Last seek position < end of headers'
dump((key, record), self._fp)
self._fp.flush()
def start(self, load=marshal.load):
"""Move the file pointer to the start of the record data."""
self._readhead()
if self._state <> START:
raise StateTransitionError, self._state
self._fp.seek(FMTSZ)
def promise(self):
"""Move the transition to the PROMISED state, where we guarantee that
any changes to the object state will be committed to the backend
database before we ever write more updates.
"""
if self._state not in (START, OPEN):
raise StateTransitionError, self._state
self._writehead(PROMISED)
self._fp.seek(FMTSZ)
def finish(self):
"""We've finished committing all object updates to the backend
storage, or we're aborting the transation. In either case, we're done
with the data in our log file. Move the transition back to the start
state.
"""
# We need to truncate the file after writing the header, for the
# algorithm above to work.
self._writehead(START)
self._fp.truncate()
def _next(self, load=marshal.load):
# Read the next marshal record from the log. If there is no next
# record return None, otherwise return a 2-tuple of the record type
# character and the data tuple.
try:
return load(self._fp)
except EOFError:
return None
# BAW: let TypeError percolate up.
def close(self, unlink=0):
"""Close the file.
If unlink is true, delete the underlying file object too.
"""
self._fp.close()
if unlink:
os.unlink(self._fp.name)
def __del__(self):
# Unsafe, and file preserving close
self.close()
class PacklessLog(CommitLog):
# Higher level interface for reading and writing version-less/undo-less
# log records.
#
# Record types:
# 'o' - object state, consisting of an oid, and the object's pickle
# data
#
def write_object(self, oid, pickle):
self._append('o', (oid, pickle))
def next_object(self):
# Get the next object pickle data. Return the oid and the pickle
# string. Raise a LogCorruptedError if there's an incomplete marshal
# record.
rec = self._next()
if rec is None:
return None
try:
key, (oid, pickle) = rec
except ValueError:
raise LogCorruptedError, 'incomplete record'
if key <> 'o':
raise LogCorruptedError, 'bad record key: %s' % key
return oid, pickle
class FullLog(CommitLog):
# Higher level interface for reading and writing full versioning and
# undoable log records.
#
# Record types:
# 'o' - object state, consisting of an oid, and the object's pickle
# data
# 'v' - new version record, consisting of a version string and a
# version id
# 'd' - discard version, consisting of a version id
#
def write_object(self, oid, pickle):
self._append('o', (oid, pickle))
def write_new_version(self, version, vid):
self._append('v', (version, vid))
def write_discard_version(self, vid):
self._append('d', (vid,))
def next_object(self):
# Get the next object record. Return the key for unpacking and the
# object record data.
rec = self._next()
if rec is None:
return NOne
try:
key, data = rec
except ValueError:
raise LogCorruptedError, 'incomplete record'
if key not in 'ovd':
raise LogCorruptedError, 'bad record key: %s' % key
return key, data
def store(self, oid, vid, nv, dataptr, pickle, previous,
dump=marshal.dump):
dump(('s',(oid,vid,nv,data,pickle,previous)), self._file)
def storeNV(self, oid, data, tid,
dump=marshal.dump, zero='\0\0\0\0\0\0\0\0'):
dump(('s',(oid,zero,zero,data,'',tid)), self._file)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment