Commit 1c0e3576 authored by Barry Warsaw's avatar Barry Warsaw

Merging in changes from the bsddb3Storage-picklelog-branch. Briefly:

The picklelog branch attempts to improve performance and reduce the
possibility of lock file exhaustion by optimistically writing pickle
and metadata tables at the store() call instead of during the
_finish() call.  store() will be bounded in the number of objects it
touches, but _finish() is unbounded, so we try to reduce the number of
database pages the latter might actually touch.

Other performance improvements are implemented based on hotshot
profiling.  The use of an extension module for Python 2.2 also speeds
up a critical loop.

This also implements the periodic checkpointing to improve recovery
times when the database is not cleanly shutdown.
parent 9747d366
...@@ -18,12 +18,18 @@ See Minimal.py for an implementation of Berkeley storage that does not support ...@@ -18,12 +18,18 @@ See Minimal.py for an implementation of Berkeley storage that does not support
undo or versioning. undo or versioning.
""" """
__version__ = '$Revision: 1.40 $'.split()[-2:][0] __version__ = '$Revision: 1.41 $'.split()[-2:][0]
import sys import sys
import struct import struct
import time import time
from cPickle import loads, Pickler
Pickler = Pickler()
Pickler.fast = 1 # Don't use a memo
fast_pickle_dumps = Pickler.dump
del Pickler
# This uses the Dunn/Kuchling PyBSDDB v3 extension module available from # This uses the Dunn/Kuchling PyBSDDB v3 extension module available from
# http://pybsddb.sourceforge.net # http://pybsddb.sourceforge.net
from bsddb3 import db from bsddb3 import db
...@@ -55,6 +61,14 @@ DNE = '\377'*8 ...@@ -55,6 +61,14 @@ DNE = '\377'*8
# DEBUGGING # DEBUGGING
#DNE = 'nonexist' # does not exist #DNE = 'nonexist' # does not exist
try:
# Python 2.2
from _helper import incr
except ImportError:
# Python 2.1
def incr(refcount, delta):
return p64(U64(refcount) + delta)
class Full(BerkeleyBase, ConflictResolvingStorage): class Full(BerkeleyBase, ConflictResolvingStorage):
...@@ -98,6 +112,12 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -98,6 +112,12 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
# Maps the concrete object referenced by oid+revid to that # Maps the concrete object referenced by oid+revid to that
# object's data pickle. # object's data pickle.
# #
# picklelog -- {oid+revid -> ''}
# Keeps a log of pickles that haven't been committed yet.
# This allows us to write pickles as we get them in the
# in separate BDB transactions. The value of the mapping is
# ignored.
#
# These are used only by the Full implementation. # These are used only by the Full implementation.
# #
# vids -- {version_string -> vid} # vids -- {version_string -> vid}
...@@ -167,6 +187,7 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -167,6 +187,7 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
# Tables common to the base framework # Tables common to the base framework
self._serials = self._setupDB('serials') self._serials = self._setupDB('serials')
self._pickles = self._setupDB('pickles') self._pickles = self._setupDB('pickles')
self._picklelog = self._setupDB('picklelog')
# These are specific to the full implementation # These are specific to the full implementation
self._vids = self._setupDB('vids') self._vids = self._setupDB('vids')
self._versions = self._setupDB('versions') self._versions = self._setupDB('versions')
...@@ -176,6 +197,7 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -176,6 +197,7 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
self._txnoids = self._setupDB('txnoids', db.DB_DUP) self._txnoids = self._setupDB('txnoids', db.DB_DUP)
self._refcounts = self._setupDB('refcounts') self._refcounts = self._setupDB('refcounts')
self._pickleRefcounts = self._setupDB('pickleRefcounts') self._pickleRefcounts = self._setupDB('pickleRefcounts')
# Initialize our cache of the next available version id. # Initialize our cache of the next available version id.
record = self._versions.cursor().last() record = self._versions.cursor().last()
if record: if record:
...@@ -186,11 +208,17 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -186,11 +208,17 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
else: else:
self.__nextvid = 0L self.__nextvid = 0L
# DEBUGGING # DEBUGGING
# NOTE: some tests will fail if you enable debugging serial numbers
# because it breaks the default assumption that serial numbers are
# timestamps. Things like packing and undoing will break.
#self._nextserial = 0L #self._nextserial = 0L
#self.profiler = hotshot.Profile('profile.dat', lineevents=1)
def close(self): def close(self):
#self.profiler.close()
self._serials.close() self._serials.close()
self._pickles.close() self._pickles.close()
self._picklelog.close()
self._vids.close() self._vids.close()
self._versions.close() self._versions.close()
self._currentVersions.close() self._currentVersions.close()
...@@ -214,7 +242,18 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -214,7 +242,18 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
self._commitlog = FullLog(dir=self._env.db_home) self._commitlog = FullLog(dir=self._env.db_home)
self._commitlog.start() self._commitlog.start()
# To turn on hotshot profiling, uncomment the following function, and rename
# _finish() to _real_finish(). Also, uncomment out the creation of the
# profiler in _setupDBs() above, and the closing of the profiler in close()
# below. Then check out the profout.py file for dumping out the profiling
# information.
#
## def _finish(self, tid, u, d, e):
## self.profiler.runcall(self._real_finish, tid, u, d, e)
def _finish(self, tid, u, d, e): def _finish(self, tid, u, d, e):
## pack = struct.pack
## unpack = struct.unpack
# This is called from the storage interface's tpc_finish() method. # This is called from the storage interface's tpc_finish() method.
# Its responsibilities are to finish the transaction with the # Its responsibilities are to finish the transaction with the
# underlying database. # underlying database.
...@@ -252,65 +291,60 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -252,65 +291,60 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
self._txnMetadata.put(tid, self._txnMetadata.put(tid,
UNDOABLE_TRANSACTION + lengths + u + d + e, UNDOABLE_TRANSACTION + lengths + u + d + e,
txn=txn) txn=txn)
picklekeys = []
metadata = []
picklerefcounts = {}
serials = []
refcounts = {}
while 1: while 1:
rec = self._commitlog.next() rec = self._commitlog.next()
if rec is None: if rec is None:
break break
op, data = rec op, data = rec
if op == 'o': if op in 'ox':
# This is a `versioned' object record. Information about # This is a `versioned' object record. Information about
# this object must be stored in the pickle table, the # this object must be stored in the pickle table, the
# object metadata table, the currentVersions tables , and # object metadata table, the currentVersions tables , and
# the transactions->oid table. # the transactions->oid table.
oid, vid, nvrevid, lrevid, pickle, prevrevid = data oid, vid, nvrevid, lrevid, refdoids, prevrevid = data
key = oid + tid key = oid + tid
if pickle: if refdoids is not None:
# This was the result of a store() call which gives us # This was the result of a store() call which gave us
# a brand new pickle, so we need to update the pickles # new pickle data. Since the pickle is already
# table. The lrevid will be empty, and we make it the # stored, we just need to twiddle with reference
# tid of this transaction # counts. We also need to clear the picklelog for
# this object revision.
# #
# Otherwise, this was the result of a commitVersion() # Otherwise, this was the result of a commitVersion()
# or abortVersion() call, essentially moving the # or abortVersion() call, essentially moving the
# object to a new version. We don't need to update # object to a new version. We don't need to update
# the pickle table because we aren't creating a new # any of the tables because we aren't creating a new
# pickle. # pickle.
self._pickles.put(key, pickle, txn=txn)
lrevid = tid lrevid = tid
# Boost the refcount of all the objects referred to by # Boost the refcount of all the objects referred to by
# this pickle. referencesf() scans a pickle and # this pickle.
# returns the list of objects referenced by the
# pickle. BAW: the signature of referencesf() has
# changed for Zope 2.4, to make it more convenient to
# use. Gotta stick with the backwards compatible
# version for now.
# #
# FIXME: need to watch for two object revisions in the # FIXME: need to watch for two object revisions in the
# same transaction and only bump the refcount once, # same transaction and only bump the refcount once,
# since we only keep the last of any such revisions. # since we only keep the last of any such revisions.
refdoids = []
referencesf(pickle, refdoids)
for roid in refdoids: for roid in refdoids:
refcount = self._refcounts.get(roid, ZERO, txn=txn) refcounts[roid] = refcounts.get(roid, 0) + 1
refcount = p64(U64(refcount) + 1)
self._refcounts.put(roid, refcount, txn=txn)
# Update the metadata table # Update the metadata table
self._metadata.put(key, vid+nvrevid+lrevid+prevrevid, if op == 'o':
txn=txn) # `x' opcode does an immediate write to metadata
metadata.append(
(key, ''.join((vid,nvrevid,lrevid,prevrevid))))
# If we're in a real version, update this table too. This # If we're in a real version, update this table too. This
# ends up putting multiple copies of the vid/oid records # ends up putting multiple copies of the vid/oid records
# in the table, but it's easier to weed those out later # in the table, but it's easier to weed those out later
# than to weed them out now. # than to weed them out now.
if vid <> ZERO: if vid <> ZERO:
self._currentVersions.put(vid, oid, txn=txn) self._currentVersions.put(vid, oid, txn=txn)
self._serials.put(oid, tid, txn=txn) serials.append((oid, tid))
self._txnoids.put(tid, oid, txn=txn)
# Update the pickle's reference count. Remember, the # Update the pickle's reference count. Remember, the
# refcount is stored as a string, so we have to do the # refcount is stored as a string, so we have to do the
# string->long->string dance. # string->long->string dance.
refcount = self._pickleRefcounts.get(key, ZERO, txn=txn) picklerefcounts[key] = picklerefcounts.get(key, 0) + 1
refcount = p64(U64(refcount) + 1)
self._pickleRefcounts.put(key, refcount, txn=txn)
elif op == 'v': elif op == 'v':
# This is a "create-a-version" record # This is a "create-a-version" record
version, vid = data version, vid = data
...@@ -327,18 +361,56 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -327,18 +361,56 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
rec = c.next_dup() rec = c.next_dup()
finally: finally:
c.close() c.close()
# It's actually faster to boogie through this list twice
#print >> sys.stderr, 'start:', self._lockstats()
for oid, tid in serials:
self._txnoids.put(tid, oid, txn=txn)
#print >> sys.stderr, 'post-txnoids:', self._lockstats()
for oid, tid in serials:
self._serials.put(oid, tid, txn=txn)
#print >> sys.stderr, 'post-serials:', self._lockstats()
for key, data in metadata:
self._metadata.put(key, data, txn=txn)
#print >> sys.stderr, 'post-metadata:', self._lockstats()
for roid, delta in refcounts.items():
refcount = self._refcounts.get(roid, ZERO, txn=txn)
self._refcounts.put(roid, incr(refcount, delta), txn=txn)
#print >> sys.stderr, 'post-refcounts:', self._lockstats()
for key, delta in picklerefcounts.items():
refcount = self._pickleRefcounts.get(key, ZERO, txn=txn)
self._pickleRefcounts.put(key, incr(refcount, delta), txn=txn)
# We're done with the picklelog
self._picklelog.truncate(txn)
#print >> sys.stderr, 'loop-finish:', self._lockstats()
# Handle lock exhaustion differently
except db.DBNoMemoryError, e:
txn.abort()
self._docheckpoint()
raise POSException.TransactionTooLargeError, e
except: except:
# If any errors whatsoever occurred, abort the transaction with # If any errors whatsoever occurred, abort the transaction with
# Berkeley, leave the commit log file in the PROMISED state (since # Berkeley, leave the commit log file in the PROMISED state (since
# its changes were never committed), and re-raise the exception. # its changes were never committed), and re-raise the exception.
txn.abort() txn.abort()
self._docheckpoint()
raise raise
else: else:
# Everything is hunky-dory. Commit the Berkeley transaction, and # Everything is hunky-dory. Commit the Berkeley transaction, and
# reset the commit log for the next transaction. # reset the commit log for the next transaction.
txn.commit() txn.commit()
self._docheckpoint()
self._closelog() self._closelog()
def _abort(self):
# We need to clear the picklelog and all the stored pickles in the
# pickle log, since we're abort this transaction.
for key in self._picklelog.keys():
del self._pickles[key]
del self._metadata[key]
# Done with the picklelog
self._picklelog.truncate()
BerkeleyBase._abort(self)
# #
# Do some things in a version # Do some things in a version
# #
...@@ -579,6 +651,33 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -579,6 +651,33 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
self._commitlog.write_new_version(version, vid) self._commitlog.write_new_version(version, vid)
return vid return vid
def _log_object(self, oid, vid, nvrevid, data, oserial):
# Save data for later commit. We do this by writing the pickle
# directly to the pickle table and saving the pickle key in the pickle
# log. We'll also save the metadata using the same technique. We
# extract the references and save them in the transaction log.
#
# Get the oids to the objects this pickle references
refdoids = []
referencesf(data, refdoids)
# Record the update to this object in the commit log.
self._commitlog.write_object(oid, vid, nvrevid, refdoids, oserial)
# Save the pickle in the database:
txn = self._env.txn_begin()
try:
key = oid + self._serial
self._pickles.put(key, data, txn=txn)
self._metadata.put(
key,
''.join((vid, nvrevid, self._serial, oserial)),
txn=txn)
self._picklelog.put(key, '', txn=txn)
except:
txn.abort()
raise
else:
txn.commit()
def store(self, oid, serial, data, version, transaction): def store(self, oid, serial, data, version, transaction):
# Transaction equivalence guard # Transaction equivalence guard
if transaction is not self._transaction: if transaction is not self._transaction:
...@@ -643,8 +742,8 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -643,8 +742,8 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
tuple(map(U64, (oid, ovid, vid)))) tuple(map(U64, (oid, ovid, vid))))
else: else:
nvrevid = onvrevid nvrevid = onvrevid
# Record the update to this object in the commit log. # Store the object
self._commitlog.write_object(oid, vid, nvrevid, data, oserial) self._log_object(oid, vid, nvrevid, data, oserial)
finally: finally:
self._lock_release() self._lock_release()
# Return our cached serial number for the object. If conflict # Return our cached serial number for the object. If conflict
...@@ -776,8 +875,7 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -776,8 +875,7 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
# see duplicate oids in this iteration. # see duplicate oids in this iteration.
oids[oid] = 1 oids[oid] = 1
for oid, vid, nvrevid, data, prevrevid in newstates: for oid, vid, nvrevid, data, prevrevid in newstates:
self._commitlog.write_object(oid, vid, nvrevid, data, self._log_object(oid, vid, nvrevid, data, prevrevid)
prevrevid)
oids[oid] = 1 oids[oid] = 1
return oids.keys() return oids.keys()
finally: finally:
......
...@@ -18,12 +18,18 @@ See Minimal.py for an implementation of Berkeley storage that does not support ...@@ -18,12 +18,18 @@ See Minimal.py for an implementation of Berkeley storage that does not support
undo or versioning. undo or versioning.
""" """
__version__ = '$Revision: 1.40 $'.split()[-2:][0] __version__ = '$Revision: 1.41 $'.split()[-2:][0]
import sys import sys
import struct import struct
import time import time
from cPickle import loads, Pickler
Pickler = Pickler()
Pickler.fast = 1 # Don't use a memo
fast_pickle_dumps = Pickler.dump
del Pickler
# This uses the Dunn/Kuchling PyBSDDB v3 extension module available from # This uses the Dunn/Kuchling PyBSDDB v3 extension module available from
# http://pybsddb.sourceforge.net # http://pybsddb.sourceforge.net
from bsddb3 import db from bsddb3 import db
...@@ -55,6 +61,14 @@ DNE = '\377'*8 ...@@ -55,6 +61,14 @@ DNE = '\377'*8
# DEBUGGING # DEBUGGING
#DNE = 'nonexist' # does not exist #DNE = 'nonexist' # does not exist
try:
# Python 2.2
from _helper import incr
except ImportError:
# Python 2.1
def incr(refcount, delta):
return p64(U64(refcount) + delta)
class Full(BerkeleyBase, ConflictResolvingStorage): class Full(BerkeleyBase, ConflictResolvingStorage):
...@@ -98,6 +112,12 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -98,6 +112,12 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
# Maps the concrete object referenced by oid+revid to that # Maps the concrete object referenced by oid+revid to that
# object's data pickle. # object's data pickle.
# #
# picklelog -- {oid+revid -> ''}
# Keeps a log of pickles that haven't been committed yet.
# This allows us to write pickles as we get them in the
# in separate BDB transactions. The value of the mapping is
# ignored.
#
# These are used only by the Full implementation. # These are used only by the Full implementation.
# #
# vids -- {version_string -> vid} # vids -- {version_string -> vid}
...@@ -167,6 +187,7 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -167,6 +187,7 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
# Tables common to the base framework # Tables common to the base framework
self._serials = self._setupDB('serials') self._serials = self._setupDB('serials')
self._pickles = self._setupDB('pickles') self._pickles = self._setupDB('pickles')
self._picklelog = self._setupDB('picklelog')
# These are specific to the full implementation # These are specific to the full implementation
self._vids = self._setupDB('vids') self._vids = self._setupDB('vids')
self._versions = self._setupDB('versions') self._versions = self._setupDB('versions')
...@@ -176,6 +197,7 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -176,6 +197,7 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
self._txnoids = self._setupDB('txnoids', db.DB_DUP) self._txnoids = self._setupDB('txnoids', db.DB_DUP)
self._refcounts = self._setupDB('refcounts') self._refcounts = self._setupDB('refcounts')
self._pickleRefcounts = self._setupDB('pickleRefcounts') self._pickleRefcounts = self._setupDB('pickleRefcounts')
# Initialize our cache of the next available version id. # Initialize our cache of the next available version id.
record = self._versions.cursor().last() record = self._versions.cursor().last()
if record: if record:
...@@ -186,11 +208,17 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -186,11 +208,17 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
else: else:
self.__nextvid = 0L self.__nextvid = 0L
# DEBUGGING # DEBUGGING
# NOTE: some tests will fail if you enable debugging serial numbers
# because it breaks the default assumption that serial numbers are
# timestamps. Things like packing and undoing will break.
#self._nextserial = 0L #self._nextserial = 0L
#self.profiler = hotshot.Profile('profile.dat', lineevents=1)
def close(self): def close(self):
#self.profiler.close()
self._serials.close() self._serials.close()
self._pickles.close() self._pickles.close()
self._picklelog.close()
self._vids.close() self._vids.close()
self._versions.close() self._versions.close()
self._currentVersions.close() self._currentVersions.close()
...@@ -214,7 +242,18 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -214,7 +242,18 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
self._commitlog = FullLog(dir=self._env.db_home) self._commitlog = FullLog(dir=self._env.db_home)
self._commitlog.start() self._commitlog.start()
# To turn on hotshot profiling, uncomment the following function, and rename
# _finish() to _real_finish(). Also, uncomment out the creation of the
# profiler in _setupDBs() above, and the closing of the profiler in close()
# below. Then check out the profout.py file for dumping out the profiling
# information.
#
## def _finish(self, tid, u, d, e):
## self.profiler.runcall(self._real_finish, tid, u, d, e)
def _finish(self, tid, u, d, e): def _finish(self, tid, u, d, e):
## pack = struct.pack
## unpack = struct.unpack
# This is called from the storage interface's tpc_finish() method. # This is called from the storage interface's tpc_finish() method.
# Its responsibilities are to finish the transaction with the # Its responsibilities are to finish the transaction with the
# underlying database. # underlying database.
...@@ -252,65 +291,60 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -252,65 +291,60 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
self._txnMetadata.put(tid, self._txnMetadata.put(tid,
UNDOABLE_TRANSACTION + lengths + u + d + e, UNDOABLE_TRANSACTION + lengths + u + d + e,
txn=txn) txn=txn)
picklekeys = []
metadata = []
picklerefcounts = {}
serials = []
refcounts = {}
while 1: while 1:
rec = self._commitlog.next() rec = self._commitlog.next()
if rec is None: if rec is None:
break break
op, data = rec op, data = rec
if op == 'o': if op in 'ox':
# This is a `versioned' object record. Information about # This is a `versioned' object record. Information about
# this object must be stored in the pickle table, the # this object must be stored in the pickle table, the
# object metadata table, the currentVersions tables , and # object metadata table, the currentVersions tables , and
# the transactions->oid table. # the transactions->oid table.
oid, vid, nvrevid, lrevid, pickle, prevrevid = data oid, vid, nvrevid, lrevid, refdoids, prevrevid = data
key = oid + tid key = oid + tid
if pickle: if refdoids is not None:
# This was the result of a store() call which gives us # This was the result of a store() call which gave us
# a brand new pickle, so we need to update the pickles # new pickle data. Since the pickle is already
# table. The lrevid will be empty, and we make it the # stored, we just need to twiddle with reference
# tid of this transaction # counts. We also need to clear the picklelog for
# this object revision.
# #
# Otherwise, this was the result of a commitVersion() # Otherwise, this was the result of a commitVersion()
# or abortVersion() call, essentially moving the # or abortVersion() call, essentially moving the
# object to a new version. We don't need to update # object to a new version. We don't need to update
# the pickle table because we aren't creating a new # any of the tables because we aren't creating a new
# pickle. # pickle.
self._pickles.put(key, pickle, txn=txn)
lrevid = tid lrevid = tid
# Boost the refcount of all the objects referred to by # Boost the refcount of all the objects referred to by
# this pickle. referencesf() scans a pickle and # this pickle.
# returns the list of objects referenced by the
# pickle. BAW: the signature of referencesf() has
# changed for Zope 2.4, to make it more convenient to
# use. Gotta stick with the backwards compatible
# version for now.
# #
# FIXME: need to watch for two object revisions in the # FIXME: need to watch for two object revisions in the
# same transaction and only bump the refcount once, # same transaction and only bump the refcount once,
# since we only keep the last of any such revisions. # since we only keep the last of any such revisions.
refdoids = []
referencesf(pickle, refdoids)
for roid in refdoids: for roid in refdoids:
refcount = self._refcounts.get(roid, ZERO, txn=txn) refcounts[roid] = refcounts.get(roid, 0) + 1
refcount = p64(U64(refcount) + 1)
self._refcounts.put(roid, refcount, txn=txn)
# Update the metadata table # Update the metadata table
self._metadata.put(key, vid+nvrevid+lrevid+prevrevid, if op == 'o':
txn=txn) # `x' opcode does an immediate write to metadata
metadata.append(
(key, ''.join((vid,nvrevid,lrevid,prevrevid))))
# If we're in a real version, update this table too. This # If we're in a real version, update this table too. This
# ends up putting multiple copies of the vid/oid records # ends up putting multiple copies of the vid/oid records
# in the table, but it's easier to weed those out later # in the table, but it's easier to weed those out later
# than to weed them out now. # than to weed them out now.
if vid <> ZERO: if vid <> ZERO:
self._currentVersions.put(vid, oid, txn=txn) self._currentVersions.put(vid, oid, txn=txn)
self._serials.put(oid, tid, txn=txn) serials.append((oid, tid))
self._txnoids.put(tid, oid, txn=txn)
# Update the pickle's reference count. Remember, the # Update the pickle's reference count. Remember, the
# refcount is stored as a string, so we have to do the # refcount is stored as a string, so we have to do the
# string->long->string dance. # string->long->string dance.
refcount = self._pickleRefcounts.get(key, ZERO, txn=txn) picklerefcounts[key] = picklerefcounts.get(key, 0) + 1
refcount = p64(U64(refcount) + 1)
self._pickleRefcounts.put(key, refcount, txn=txn)
elif op == 'v': elif op == 'v':
# This is a "create-a-version" record # This is a "create-a-version" record
version, vid = data version, vid = data
...@@ -327,18 +361,56 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -327,18 +361,56 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
rec = c.next_dup() rec = c.next_dup()
finally: finally:
c.close() c.close()
# It's actually faster to boogie through this list twice
#print >> sys.stderr, 'start:', self._lockstats()
for oid, tid in serials:
self._txnoids.put(tid, oid, txn=txn)
#print >> sys.stderr, 'post-txnoids:', self._lockstats()
for oid, tid in serials:
self._serials.put(oid, tid, txn=txn)
#print >> sys.stderr, 'post-serials:', self._lockstats()
for key, data in metadata:
self._metadata.put(key, data, txn=txn)
#print >> sys.stderr, 'post-metadata:', self._lockstats()
for roid, delta in refcounts.items():
refcount = self._refcounts.get(roid, ZERO, txn=txn)
self._refcounts.put(roid, incr(refcount, delta), txn=txn)
#print >> sys.stderr, 'post-refcounts:', self._lockstats()
for key, delta in picklerefcounts.items():
refcount = self._pickleRefcounts.get(key, ZERO, txn=txn)
self._pickleRefcounts.put(key, incr(refcount, delta), txn=txn)
# We're done with the picklelog
self._picklelog.truncate(txn)
#print >> sys.stderr, 'loop-finish:', self._lockstats()
# Handle lock exhaustion differently
except db.DBNoMemoryError, e:
txn.abort()
self._docheckpoint()
raise POSException.TransactionTooLargeError, e
except: except:
# If any errors whatsoever occurred, abort the transaction with # If any errors whatsoever occurred, abort the transaction with
# Berkeley, leave the commit log file in the PROMISED state (since # Berkeley, leave the commit log file in the PROMISED state (since
# its changes were never committed), and re-raise the exception. # its changes were never committed), and re-raise the exception.
txn.abort() txn.abort()
self._docheckpoint()
raise raise
else: else:
# Everything is hunky-dory. Commit the Berkeley transaction, and # Everything is hunky-dory. Commit the Berkeley transaction, and
# reset the commit log for the next transaction. # reset the commit log for the next transaction.
txn.commit() txn.commit()
self._docheckpoint()
self._closelog() self._closelog()
def _abort(self):
# We need to clear the picklelog and all the stored pickles in the
# pickle log, since we're abort this transaction.
for key in self._picklelog.keys():
del self._pickles[key]
del self._metadata[key]
# Done with the picklelog
self._picklelog.truncate()
BerkeleyBase._abort(self)
# #
# Do some things in a version # Do some things in a version
# #
...@@ -579,6 +651,33 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -579,6 +651,33 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
self._commitlog.write_new_version(version, vid) self._commitlog.write_new_version(version, vid)
return vid return vid
def _log_object(self, oid, vid, nvrevid, data, oserial):
# Save data for later commit. We do this by writing the pickle
# directly to the pickle table and saving the pickle key in the pickle
# log. We'll also save the metadata using the same technique. We
# extract the references and save them in the transaction log.
#
# Get the oids to the objects this pickle references
refdoids = []
referencesf(data, refdoids)
# Record the update to this object in the commit log.
self._commitlog.write_object(oid, vid, nvrevid, refdoids, oserial)
# Save the pickle in the database:
txn = self._env.txn_begin()
try:
key = oid + self._serial
self._pickles.put(key, data, txn=txn)
self._metadata.put(
key,
''.join((vid, nvrevid, self._serial, oserial)),
txn=txn)
self._picklelog.put(key, '', txn=txn)
except:
txn.abort()
raise
else:
txn.commit()
def store(self, oid, serial, data, version, transaction): def store(self, oid, serial, data, version, transaction):
# Transaction equivalence guard # Transaction equivalence guard
if transaction is not self._transaction: if transaction is not self._transaction:
...@@ -643,8 +742,8 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -643,8 +742,8 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
tuple(map(U64, (oid, ovid, vid)))) tuple(map(U64, (oid, ovid, vid))))
else: else:
nvrevid = onvrevid nvrevid = onvrevid
# Record the update to this object in the commit log. # Store the object
self._commitlog.write_object(oid, vid, nvrevid, data, oserial) self._log_object(oid, vid, nvrevid, data, oserial)
finally: finally:
self._lock_release() self._lock_release()
# Return our cached serial number for the object. If conflict # Return our cached serial number for the object. If conflict
...@@ -776,8 +875,7 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -776,8 +875,7 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
# see duplicate oids in this iteration. # see duplicate oids in this iteration.
oids[oid] = 1 oids[oid] = 1
for oid, vid, nvrevid, data, prevrevid in newstates: for oid, vid, nvrevid, data, prevrevid in newstates:
self._commitlog.write_object(oid, vid, nvrevid, data, self._log_object(oid, vid, nvrevid, data, prevrevid)
prevrevid)
oids[oid] = 1 oids[oid] = 1
return oids.keys() return oids.keys()
finally: finally:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment