Commit a64b1c71 authored by Barry Warsaw's avatar Barry Warsaw

Fix bugs exposed by unit testing. Note that this version of the

storage has a failing abortVersion(), so it's mostly a checkpoint.

Not yet tested: commitVersion(), transactionalUndo(), undoLog().

Not yet implemented: history(), pack()

Specific changes:

    - Table txnOids renamed to txnoids everywhere.

    - Define a module global `zero' constant and use (via global)
      where ever appropriate (should be named ZERO but that's ugly).

    - _finish(): Only boost the refcount of objects referenced by the
      pickle when we've actually got a new pickle.  Also, add a note
      about the currentVersions table, which will contain multiple
      copies of vid/oid records.  It's easier to weed them out later
      than to weed them out here.  One of the places you need to weed
      them all out is at the end of this method, in the
      `delete-a-version' branch.

    - abortVersion(): Record oids found by scanning the
      currentVersions table in a set, to account for the possible
      multiple vid/oid records.  Fix some typos.  When calling
      write_nonversion_object() on the CommitLog, the prevrevid ought
      to be the revid of the current object revision, for undo
      purposes.

    - load(): Where we previously had a comment that asked whether we
      should assert that the version is empty (after we've checked
      everything else), now we really check that and raise a
      VersionError if not.

    - loadSerial(): Fix a typo.

    - store(): Inside the old-revision-id-is-true test, if the old vid
      is not zero and it is equal to the new vid, then the non-version
      revision of the object is the old non-version revision (added
      the else: clause).

    - transactionalUndo(): Implemented but not yet tested.

    - undo(): Removed.

    - versionEmpty(): Semantics are to return true if an unknown
      version is given.  As an extra bonus, if version is empty
      (i.e. the non-version) check the serials table.
parent 0dd3eab8
...@@ -4,7 +4,7 @@ See Minimal.py for an implementation of Berkeley storage that does not support ...@@ -4,7 +4,7 @@ See Minimal.py for an implementation of Berkeley storage that does not support
undo or versioning. undo or versioning.
""" """
# $Revision: 1.7 $ # $Revision: 1.8 $
__version__ = '0.1' __version__ = '0.1'
import struct import struct
...@@ -33,6 +33,8 @@ from CommitLog import FullLog ...@@ -33,6 +33,8 @@ from CommitLog import FullLog
UNDOABLE_TRANSACTION = 'Y' UNDOABLE_TRANSACTION = 'Y'
PROTECTED_TRANSACTION = 'N' PROTECTED_TRANSACTION = 'N'
zero = '\0'*8
class InternalInconsistencyError(POSException.POSError, AssertionError): class InternalInconsistencyError(POSException.POSError, AssertionError):
...@@ -127,7 +129,7 @@ class Full(BerkeleyBase): ...@@ -127,7 +129,7 @@ class Full(BerkeleyBase):
# ext is the extra info passed to tpc_finish(). It is a # ext is the extra info passed to tpc_finish(). It is a
# dictionary that we get already pickled by BaseStorage. # dictionary that we get already pickled by BaseStorage.
# #
# txnOids -- {tid -> [oid]} # txnoids -- {tid -> [oid]}
# Maps transaction ids to the oids of the objects modified by the # Maps transaction ids to the oids of the objects modified by the
# transaction. # transaction.
# #
...@@ -147,7 +149,7 @@ class Full(BerkeleyBase): ...@@ -147,7 +149,7 @@ class Full(BerkeleyBase):
self._currentVersions = self._setupDB('currentVersions', db.DB_DUP) self._currentVersions = self._setupDB('currentVersions', db.DB_DUP)
self._metadata = self._setupDB('metadata') self._metadata = self._setupDB('metadata')
self._txnMetadata = self._setupDB('txnMetadata') self._txnMetadata = self._setupDB('txnMetadata')
self._txnOids = self._setupDB('txnOids', db.DB_DUP) self._txnoids = self._setupDB('txnoids', db.DB_DUP)
self._refcounts = self._setupDB('refcounts') self._refcounts = self._setupDB('refcounts')
self._pickleRefcounts = self._setupDB('pickleRefcounts') self._pickleRefcounts = self._setupDB('pickleRefcounts')
# Initialize our cache of the next available version id. # Initialize our cache of the next available version id.
...@@ -168,7 +170,7 @@ class Full(BerkeleyBase): ...@@ -168,7 +170,7 @@ class Full(BerkeleyBase):
self._currentVersions.close() self._currentVersions.close()
self._metadata.close() self._metadata.close()
self._txnMetadata.close() self._txnMetadata.close()
self._txnOids.close() self._txnoids.close()
self._refcounts.close() self._refcounts.close()
self._pickleRefcounts.close() self._pickleRefcounts.close()
BerkeleyBase.close(self) BerkeleyBase.close(self)
...@@ -189,6 +191,7 @@ class Full(BerkeleyBase): ...@@ -189,6 +191,7 @@ class Full(BerkeleyBase):
self._commitlog.promise() self._commitlog.promise()
def _finish(self, tid, u, d, e): def _finish(self, tid, u, d, e):
global zero
# This is called from the storage interface's tpc_finish() method. # This is called from the storage interface's tpc_finish() method.
# Its responsibilities are to finish the transaction with the # Its responsibilities are to finish the transaction with the
# underlying database. # underlying database.
...@@ -212,7 +215,6 @@ class Full(BerkeleyBase): ...@@ -212,7 +215,6 @@ class Full(BerkeleyBase):
# u is the user associated with the transaction # u is the user associated with the transaction
# d is the description of the transaction # d is the description of the transaction
# e is the transaction extension # e is the transaction extension
zero = '\0'*8
txn = self._env.txn_begin() txn = self._env.txn_begin()
try: try:
# Update the transaction metadata # Update the transaction metadata
...@@ -252,25 +254,29 @@ class Full(BerkeleyBase): ...@@ -252,25 +254,29 @@ class Full(BerkeleyBase):
# pickle. # pickle.
self._pickles.put(key, pickle, txn=txn) self._pickles.put(key, pickle, txn=txn)
lrevid = tid lrevid = tid
# Boost the refcount of all the objects referred to by
# this pickle. referencesf() scans a pickle and
# returns the list of objects referenced by the
# pickle. BAW: the signature of referencesf() has
# changed for Zope 2.4, to make it more convenient to
# use. Gotta stick with the backwards compatible
# version for now.
refdoids = []
referencesf(pickle, refdoids)
for roid in refdoids:
refcount = self._refcounts.get(roid, zero, txn=txn)
refcount = utils.p64(utils.U64(refcount) + 1)
self._refcounts.put(roid, refcount, txn=txn)
# Update the metadata table # Update the metadata table
self._metadata.put(key, vid+nvrevid+tid+prevrevid, txn=txn) self._metadata.put(key, vid+nvrevid+tid+prevrevid, txn=txn)
# If we're in a real version, update this table too # If we're in a real version, update this table too. This
# ends up putting multiple copies of the vid/oid records
# in the table, but it's easier to weed those out later
# than to weed them out now.
if vid <> zero: if vid <> zero:
self._currentVersions.put(vid, oid, txn=txn) self._currentVersions.put(vid, oid, txn=txn)
self._serials.put(oid, tid, txn=txn) self._serials.put(oid, tid, txn=txn)
self._txnOids.put(tid, oid, txn=txn) self._txnoids.put(tid, oid, txn=txn)
# Boost the refcount of all the objects referred to by
# this pickle. referencesf() scans a pickle and returns
# the list of objects referenced by the pickle. BAW: the
# signature of referencesf() has changed for Zope 2.4, to
# make it more convenient to use. Gotta stick with the
# backwards compatible version for now.
refdoids = []
referencesf(pickle, refdoids)
for roid in refdoids:
refcount = self._refcounts.get(roid, zero, txn=txn)
refcount = utils.p64(utils.U64(refcount) + 1)
self._refcounts.put(roid, refcount, txn=txn)
# Update the pickle's reference count. Remember, the # Update the pickle's reference count. Remember, the
# refcount is stored as a string, so we have to do the # refcount is stored as a string, so we have to do the
# string->long->string dance. # string->long->string dance.
...@@ -285,7 +291,15 @@ class Full(BerkeleyBase): ...@@ -285,7 +291,15 @@ class Full(BerkeleyBase):
elif op == 'd': elif op == 'd':
# This is a "delete-a-version" record # This is a "delete-a-version" record
vid = data[0] vid = data[0]
self._currentVersions.delete(vid, txn=txn) c = self._currentVersions.cursor(txn=txn)
try:
rec = c.set(vid)
while rec:
c.delete()
rec = c.next()
finally:
c.close()
except: except:
# If any errors whatsoever occurred, abort the transaction with # If any errors whatsoever occurred, abort the transaction with
# Berkeley, leave the commit log file in the PROMISED state (since # Berkeley, leave the commit log file in the PROMISED state (since
...@@ -303,24 +317,23 @@ class Full(BerkeleyBase): ...@@ -303,24 +317,23 @@ class Full(BerkeleyBase):
# #
def abortVersion(self, version, transaction): def abortVersion(self, version, transaction):
global zero
# Abort the version, but retain enough information to make the abort # Abort the version, but retain enough information to make the abort
# undoable. # undoable.
if transaction is not self._transaction: if transaction is not self._transaction:
raise POSException.StorageTransactionError(self, transaction) raise POSException.StorageTransactionError(self, transaction)
zero = '\0'*8
c = None # the currentVersions cursor c = None # the currentVersions cursor
self._lock_acquire() self._lock_acquire()
try: try:
# The transaction id for this abort
tid = self._serial
# Let KeyErrors percolate up. This is how we ensure that the # Let KeyErrors percolate up. This is how we ensure that the
# version we're aborting is not the empty string. # version we're aborting is not the empty string.
vid = self._vids[version] vid = self._vids[version]
# We need to keep track of the oids that are affected by the abort # We need to keep track of the oids that are affected by the abort
# so that we can return it to the connection, which must # so that we can return it to the connection, which must
# invalidate the objects so they can be reloaded. # invalidate the objects so they can be reloaded. We use a set
oids = [] # here because currentVersions may have duplicate vid/oid records.
oids = {}
c = self._currentVersions.cursor() c = self._currentVersions.cursor()
rec = c.set(vid) rec = c.set(vid)
# Now cruise through all the records for this version, looking for # Now cruise through all the records for this version, looking for
...@@ -333,7 +346,7 @@ class Full(BerkeleyBase): ...@@ -333,7 +346,7 @@ class Full(BerkeleyBase):
oid = rec[1] # ignore the key oid = rec[1] # ignore the key
revid = self._serials[oid] revid = self._serials[oid]
meta = self._metadata[oid+revid] meta = self._metadata[oid+revid]
curvid, nvrevid = struct.unpack('8s8s8s', meta[:16]) curvid, nvrevid = struct.unpack('8s8s', meta[:16])
# Make sure that the vid in the metadata record is the same as # Make sure that the vid in the metadata record is the same as
# the vid we sucked out of the vids table, otherwise we've got # the vid we sucked out of the vids table, otherwise we've got
# an internal database inconsistency. # an internal database inconsistency.
...@@ -345,29 +358,31 @@ class Full(BerkeleyBase): ...@@ -345,29 +358,31 @@ class Full(BerkeleyBase):
continue continue
# Get the non-version data for the object # Get the non-version data for the object
nvmeta = self._metadata[oid+nvrevid] nvmeta = self._metadata[oid+nvrevid]
curvid, nvrevid, lrevid = unpack('8s8s8s', nvmeta[:24]) curvid, nvrevid, lrevid = struct.unpack('8s8s8s', nvmeta[:24])
# We expect curvid to be zero because we just got the # We expect curvid to be zero because we just got the
# non-version entry. # non-version entry.
if curvid <> zero: if curvid <> zero:
raise InternalInconsistencyError raise InternalInconsistencyError
# Write the object id, live revision id, and this transaction # Write the object id, live revision id, the current revision
# id (which serves as the previous revid) to the commit log. # id (which serves as the previous revid to this transaction)
self._commitlog.write_nonversion_object(oid, lrevid, tid) # to the commit log.
self._commitlog.write_nonversion_object(oid, lrevid, revid)
# Remember to return the oid... # Remember to return the oid...
oids.append(oid) oids[oid] = 1
# ...and get the next record for this vid # ...and get the next record for this vid
rec = c.next_dup() rec = c.next()
# We've now processed all the objects on the discarded version, so # We've now processed all the objects on the discarded version, so
# write this to the commit log and return the list of oids to # write this to the commit log and return the list of oids to
# invalidate. # invalidate.
self._commitlog.write_discard_version(vid) self._commitlog.write_discard_version(vid)
return oids return oids.keys()
finally: finally:
if c: if c:
c.close() c.close()
self._lock_release() self._lock_release()
def commitVersion(self, src, dest, transaction): def commitVersion(self, src, dest, transaction):
global zero
# Commit a source version `src' to a destination version `dest'. It's # Commit a source version `src' to a destination version `dest'. It's
# perfectly valid to move an object from one version to another. src # perfectly valid to move an object from one version to another. src
# and dest are version strings, and if we're committing to a # and dest are version strings, and if we're committing to a
...@@ -375,7 +390,6 @@ class Full(BerkeleyBase): ...@@ -375,7 +390,6 @@ class Full(BerkeleyBase):
if transaction is not self._transaction: if transaction is not self._transaction:
raise POSException.StorageTransactionError(self, transaction) raise POSException.StorageTransactionError(self, transaction)
zero = '\0'*8
c = None # the currentVersions cursor c = None # the currentVersions cursor
self._lock_acquire() self._lock_acquire()
try: try:
...@@ -414,7 +428,7 @@ class Full(BerkeleyBase): ...@@ -414,7 +428,7 @@ class Full(BerkeleyBase):
# Remember to return the oid... # Remember to return the oid...
oids.append(oid) oids.append(oid)
# ...and get the next record for this vid # ...and get the next record for this vid
rec = c.next_dup() rec = c.next()
# Now that we're done, we can discard this version # Now that we're done, we can discard this version
self._commitlog.write_discard_version(vid) self._commitlog.write_discard_version(vid)
return oids return oids
...@@ -424,6 +438,7 @@ class Full(BerkeleyBase): ...@@ -424,6 +438,7 @@ class Full(BerkeleyBase):
self._lock_release() self._lock_release()
def modifiedInVersion(self, oid): def modifiedInVersion(self, oid):
global zero
# Return the version string of the version that contains the most # Return the version string of the version that contains the most
# recent change to the object. The empty string means the change # recent change to the object. The empty string means the change
# isn't in a version. # isn't in a version.
...@@ -432,7 +447,7 @@ class Full(BerkeleyBase): ...@@ -432,7 +447,7 @@ class Full(BerkeleyBase):
# Let KeyErrors percolate up # Let KeyErrors percolate up
revid = self._serials[oid] revid = self._serials[oid]
vid = self._metadata[oid+revid][:8] vid = self._metadata[oid+revid][:8]
if vid == '\0'*8: if vid == zero:
# Not in a version # Not in a version
return '' return ''
return self._versions[vid] return self._versions[vid]
...@@ -444,6 +459,7 @@ class Full(BerkeleyBase): ...@@ -444,6 +459,7 @@ class Full(BerkeleyBase):
# #
def load(self, oid, version): def load(self, oid, version):
global zero
# BAW: in the face of application level conflict resolution, it's # BAW: in the face of application level conflict resolution, it's
# /possible/ to load an object that is sitting in the commit log. # /possible/ to load an object that is sitting in the commit log.
# That's bogus though because there's no way to know what to return; # That's bogus though because there's no way to know what to return;
...@@ -468,13 +484,15 @@ class Full(BerkeleyBase): ...@@ -468,13 +484,15 @@ class Full(BerkeleyBase):
# object is living in is equal to the version that's being # object is living in is equal to the version that's being
# requested, then we can simply return the pickle referenced by # requested, then we can simply return the pickle referenced by
# the revid. # the revid.
if vid == '\0'*8 or self._versions[vid] == version: if vid == zero or self._versions[vid] == version:
return self._pickles[oid+lrevid], revid return self._pickles[oid+lrevid], revid
# Otherwise, we recognize that an object cannot be stored in more # Otherwise, we recognize that an object cannot be stored in more
# than one version at a time (although this may change if/when # than one version at a time (although this may change if/when
# "Unlocked" versions are added). So we return the non-version # "Unlocked" versions are added). So we return the non-version
# revision of the object. BAW: should we assert that version is # revision of the object. Make sure the version is empty though.
# empty in this case? if version:
raise POSException.VersionError(
'Undefined version: %s' % version)
lrevid = self._metadata[oid+nvrevid][16:24] lrevid = self._metadata[oid+nvrevid][16:24]
return self._pickles[oid+lrevid], nvrevid return self._pickles[oid+lrevid], nvrevid
finally: finally:
...@@ -488,7 +506,7 @@ class Full(BerkeleyBase): ...@@ -488,7 +506,7 @@ class Full(BerkeleyBase):
# corresponding to the oid and the supplied serial # corresponding to the oid and the supplied serial
# a.k.a. revision. # a.k.a. revision.
lrevid = self._metadata[oid+serial][16:24] lrevid = self._metadata[oid+serial][16:24]
return self._pickle[oid+lrevid] return self._pickles[oid+lrevid]
finally: finally:
self._lock_release() self._lock_release()
...@@ -511,10 +529,12 @@ class Full(BerkeleyBase): ...@@ -511,10 +529,12 @@ class Full(BerkeleyBase):
return vid return vid
def store(self, oid, serial, data, version, transaction): def store(self, oid, serial, data, version, transaction):
global zero
# Transaction equivalence guard
if transaction is not self._transaction: if transaction is not self._transaction:
raise POSException.StorageTransactionError(self, transaction) raise POSException.StorageTransactionError(self, transaction)
zero = '\0'*8
self._lock_acquire() self._lock_acquire()
try: try:
# Check for conflict errors. JF says: under some circumstances, # Check for conflict errors. JF says: under some circumstances,
...@@ -567,7 +587,9 @@ class Full(BerkeleyBase): ...@@ -567,7 +587,9 @@ class Full(BerkeleyBase):
# current version. That's a no no. # current version. That's a no no.
raise POSException.VersionLockError( raise POSException.VersionLockError(
'version mismatch for object %s (was: %s, got: %s)' % 'version mismatch for object %s (was: %s, got: %s)' %
map(utils.U64, (oid, ovid, vid))) tuple(map(utils.U64, (oid, ovid, vid))))
else:
nvrevid = onvrevid
# Record the update to this object in the commit log. # Record the update to this object in the commit log.
self._commitlog.write_object(oid, vid, nvrevid, data, oserial) self._commitlog.write_object(oid, vid, nvrevid, data, oserial)
finally: finally:
...@@ -575,191 +597,62 @@ class Full(BerkeleyBase): ...@@ -575,191 +597,62 @@ class Full(BerkeleyBase):
# Return our cached serial number for the object. # Return our cached serial number for the object.
return self._serial return self._serial
def _zaprevision(self, key, txn):
# Delete the metadata record pointed to by the key, decrefing the
# reference counts of the pickle pointed to by this record, and
# perform cascading decrefs on the referenced objects.
#
# We need the lrevid which points to the pickle for this revision...
vid, nvrevid, lrevid = self._metadata.get(key, txn=txn)[16:24]
# ...and now delete the metadata record for this object revision
self._metadata.delete(key, txn=txn)
# Decref the reference count of the pickle pointed to by oid+lrevid.
# If the reference count goes to zero, we can garbage collect the
# pickle, and decref all the objects pointed to by the pickle (with of
# course, cascading garbage collection).
pkey = key[:8] + lrevid
refcount = self._pickleRefcounts.get(pkey, txn=txn)
# It's possible the pickleRefcounts entry for this oid has already
# been deleted by a previous pass of _zaprevision(). If so, we're
# done.
if refcount is None:
return
refcount = utils.U64(refcount) - 1
if refcount > 0:
self._pickleRefcounts.put(pkey, utils.p64(refcount), txn=txn)
return
# The refcount of this pickle has gone to zero, so we need to garbage
# collect it, and decref all the objects it points to.
self._pickleRefcounts.delete(pkey, txn=txn)
pickle = self._pickles.get(pkey, txn=txn)
# Sniff the pickle to get the objects it refers to
collectables = []
refoids = []
referencesf(pickle, oids)
# Now decref the reference counts for each of those objects. If it
# goes to zero, remember the oid so we can recursively zap its
# metadata too.
for oid in refoids:
refcount = self._refcounts.get(oid, txn=txn)
refcount = utils.U64(refcount) - 1
if refcount > 0:
self._refcounts.put(oid, utils.p64(refcount), txn=txn)
else:
collectables.append(oid)
# Now for all objects whose refcounts just went to zero, we want to
# delete any records that pertain to this object. When we get to
# deleting the metadata record, we'll do it recursively so as to
# decref any pickles it points to. For everything else, we'll do it
# in the most efficient manner possible.
tids = []
for oid in collectables:
self._serials.delete(oid, txn=txn)
self._refcounts.delete(oid, txn=txn)
# To delete all the metadata records associated with this object
# id, we use a trick of Berkeley cursor objects to only partially
# specify the key. This works because keys are compared
# lexically, with shorter keys collating before longer keys.
c = self._metadata.cursor()
try:
rec = c.set(oid)
while rec and rec[0][:8] == oid:
# Remember the transaction ids so we can clean up the
# txnOids table below. Note that we don't record the vids
# because now that we don't have destructive undo,
# _zaprevisions() can only be called during a pack() and
# it is impossible to pack current records (and hence
# currentVersions).
tids.append(rec[0][8:]) # second 1/2 of the key
self._zaprevision(rec[0], txn)
rec = c.next()
finally:
c.close()
# Delete all the txnOids entries that referenced this oid
for tid in tids:
c = self._txnOids.cursor(txn=txn)
try:
rec = c.set_both(tid, oid)
while rec:
# Although unlikely, it is possible that an object got
# modified more than once in a transaction.
c.delete()
rec = c.next_dup()
finally:
c.close()
def transactionalUndo(self, tid, transaction): def transactionalUndo(self, tid, transaction):
if transaction is not self._transaction: if transaction is not self._transaction:
raise POSException.StorageTransactionError(self, transaction) raise POSException.StorageTransactionError(self, transaction)
oids = [] newrevs = []
self._lock_acquire() c = None
try:
return oids
finally:
self._lock_release()
# REMOVE ME -- DON'T IMPLEMENT UNDO SINCE WE'RE GOING TO IMPLEMENT
# transactionalUndo() INSTEAD
def undo(self, tid):
# Attempt to undo transaction. NOTE: the current storage interface
# documentation says that this method takes a third argument, which is
# a limit on the number of oids to return. JF says, "Let's get rid of
# the third argument."
c = None # txnOids cursor
oids = []
zero = '\0'*8
self._lock_acquire() self._lock_acquire()
txn = self._env.txn_begin()
try: try:
# Make sure the transaction is undoable. If this transaction # First, make sure the transaction isn't protected by a pack
# occurred earlier than a pack operation, it is no longer status = self._txnMetadata[tid][1]
# undoable. The status flag indicates its undoability.
status = self._txnMetadata.get(tid, txn=txn)[1]
if status == PROTECTED_TRANSACTION: if status == PROTECTED_TRANSACTION:
raise POSException.UndoError, 'Transaction cannot be undone' raise POSException.UndoError, 'Transaction cannot be undone'
# Create the cursor and begin the transaction
c = self._txnOids.cursor()
try:
rec = c.set(tid)
while rec:
oid = rec[1]
oids.append(oid)
# Make sure the tid is current
if self._serials.get(oid, txn=txn) <> tid:
# BAW: we can only undo the most current revision of
# the object???
raise POSException.UndoError(
"Not object's current revision")
# Get rid of the metadata record for this object revision
# and perform cascading decrefs
self._zaprevision(oid, tid, txn)
key = oid + tid # Calculate all the oids modified in the transaction
# Get the metadata for this object revision, and then c = self._txnoids.cursor()
# delete the metadata record. rec = c.set(tid)
vid, nvrevid, lrevid, prevrevid = struct.unpack( while rec:
'8s8s8s8s', self._metadata.get(key, txn=txn)) oid = rec[1]
# Delete the metadata record for this object revision # In order to be able to undo this transaction, we must be
self._metadata.delete(key, txn=txn) # undoing either the current revision of the object, or we
# Decref the reference count of the pickle that we're # must be restoring the exact same pickle (identity compared)
# pointing to and garbage collect it if the refcount falls # that would be restored if we were undoing the current
# to zero. # revision.
self._decref(oid, lrevid, txn) revid = self._serials[oid]
# If the prevrevid is zero, then we've just undone the if revid == tid:
# creation of this object, so we can get rid of its prevrevid = self._metadata[oid+tid][24:]
# serials record. Otherwise, update the serials record to newrevs.append((oid, self._metadata[oid+prevrevid]))
# point to the previous revision of the object. else:
if prevrevid == zero: # Compare the lrevid (pickle pointers) for the current
self._serials.delete(oid, txn=txn) # revision of the object and the revision previous to the
else: # one we're undoing.
self._serials.put(oid, prevrevid, txn=txn) lrevid = self._metadata[oid+revid][16:24]
prec = self._metadata.get(oid+prevrevid, txn=txn) # When we undo this transaction, the previous record will
# BAW: what does it mean if the metadata for the # become the current record.
# previous revision of the object doesn't exist??? prevrevid = self._metadata[oid+tid][24:]
if not prec: # And here's the pickle pointer for that potentially
raise POSException.UndoError( # soon-to-be current record
"No previous revision for object") prevrec = self._metadata[oid+prevrevid]
pvid = prec[:8] if lrevid <> prevrec[16:24]:
# If the version for the previous revision of the # They aren't the same, so we cannot undo this txn
# object is different than the current revision of the raise POSException.UndoError, 'Cannot undo transaction'
# object, then we're undoing past a version creation, newrevs.append((oid, prevrec))
# so we can delete the entry for this vid/oid pair in # Check the next txnoid record
# the currentVersions table. rec = c.next()
if pvid <> vid: # Okay, we've checked all the oids affected by the transaction
# Don't delete the non-version revision of the # we're about to undo, and everything looks good. So now we'll
# object. # write to the log the new object records we intend to commit.
if vid <> zero: c.close()
tmpc = self._currentVersions.cursor(txn=txn) c = None
try: oids = []
rec = tmpc.get_both(vid, oid) for oid, rec in newrevs:
if rec: vid, nvrevid, lrevid, prevrevid = struct.unpack(
tmpc.delete() '8s8s8s8s', rec)
finally: self._commitlog.write_moved_object(oid, vid, nvrevid, lrevid,
tmpc.close() prevrevid)
if pvid <> zero: oids.append(oid)
# Make the previous version the current one
self._currentVersions.put(pvid, oid, txn=txn)
# Finally, delete the transaction metadata associated with
# the transaction we just undid.
self._txnMetadata.delete(tid, txn=txn)
self._txnOids.delete(tid, txn=txn)
except:
txn.abort()
raise
else:
txn.commit()
return oids return oids
finally: finally:
if c: if c:
...@@ -832,13 +725,24 @@ class Full(BerkeleyBase): ...@@ -832,13 +725,24 @@ class Full(BerkeleyBase):
# Return true if version is empty. # Return true if version is empty.
self._lock_acquire() self._lock_acquire()
try: try:
# Let these KeyError exceptions percolate up # First, check if we're querying the empty (i.e. non) version
vid = self._vids[version] if not version:
# But catch these, because it means the version is empty c = self._serials.cursor()
if self._currentVersions.has_key(vid): try:
rec = c.first()
return not rec
finally:
c.close()
# If the named version doesn't exist or there are no objects in
# the version, then return true.
missing = []
vid = self._vids.get(version, missing)
if vid is missing:
return 1 return 1
else: if self._currentVersions.has_key(vid):
return 0 return 0
else:
return 1
finally: finally:
self._lock_release() self._lock_release()
...@@ -875,6 +779,88 @@ class Full(BerkeleyBase): ...@@ -875,6 +779,88 @@ class Full(BerkeleyBase):
tid=self._current[oid] tid=self._current[oid]
finally: self._lock_release() finally: self._lock_release()
def _zaprevision(self, key, txn):
# Delete the metadata record pointed to by the key, decrefing the
# reference counts of the pickle pointed to by this record, and
# perform cascading decrefs on the referenced objects.
#
# We need the lrevid which points to the pickle for this revision...
vid, nvrevid, lrevid = self._metadata.get(key, txn=txn)[16:24]
# ...and now delete the metadata record for this object revision
self._metadata.delete(key, txn=txn)
# Decref the reference count of the pickle pointed to by oid+lrevid.
# If the reference count goes to zero, we can garbage collect the
# pickle, and decref all the objects pointed to by the pickle (with of
# course, cascading garbage collection).
pkey = key[:8] + lrevid
refcount = self._pickleRefcounts.get(pkey, txn=txn)
# It's possible the pickleRefcounts entry for this oid has already
# been deleted by a previous pass of _zaprevision(). If so, we're
# done.
if refcount is None:
return
refcount = utils.U64(refcount) - 1
if refcount > 0:
self._pickleRefcounts.put(pkey, utils.p64(refcount), txn=txn)
return
# The refcount of this pickle has gone to zero, so we need to garbage
# collect it, and decref all the objects it points to.
self._pickleRefcounts.delete(pkey, txn=txn)
pickle = self._pickles.get(pkey, txn=txn)
# Sniff the pickle to get the objects it refers to
collectables = []
refoids = []
referencesf(pickle, oids)
# Now decref the reference counts for each of those objects. If it
# goes to zero, remember the oid so we can recursively zap its
# metadata too.
for oid in refoids:
refcount = self._refcounts.get(oid, txn=txn)
refcount = utils.U64(refcount) - 1
if refcount > 0:
self._refcounts.put(oid, utils.p64(refcount), txn=txn)
else:
collectables.append(oid)
# Now for all objects whose refcounts just went to zero, we want to
# delete any records that pertain to this object. When we get to
# deleting the metadata record, we'll do it recursively so as to
# decref any pickles it points to. For everything else, we'll do it
# in the most efficient manner possible.
tids = []
for oid in collectables:
self._serials.delete(oid, txn=txn)
self._refcounts.delete(oid, txn=txn)
# To delete all the metadata records associated with this object
# id, we use a trick of Berkeley cursor objects to only partially
# specify the key. This works because keys are compared
# lexically, with shorter keys collating before longer keys.
c = self._metadata.cursor()
try:
rec = c.set(oid)
while rec and rec[0][:8] == oid:
# Remember the transaction ids so we can clean up the
# txnoids table below. Note that we don't record the vids
# because now that we don't have destructive undo,
# _zaprevisions() can only be called during a pack() and
# it is impossible to pack current records (and hence
# currentVersions).
tids.append(rec[0][8:]) # second 1/2 of the key
self._zaprevision(rec[0], txn)
rec = c.next()
finally:
c.close()
# Delete all the txnoids entries that referenced this oid
for tid in tids:
c = self._txnoids.cursor(txn=txn)
try:
rec = c.set_both(tid, oid)
while rec:
# Although unlikely, it is possible that an object got
# modified more than once in a transaction.
c.delete()
rec = c.next_dup()
finally:
c.close()
def pack(self, t, referencesf): def pack(self, t, referencesf):
# FIXME # FIXME
......
...@@ -4,7 +4,7 @@ See Minimal.py for an implementation of Berkeley storage that does not support ...@@ -4,7 +4,7 @@ See Minimal.py for an implementation of Berkeley storage that does not support
undo or versioning. undo or versioning.
""" """
# $Revision: 1.7 $ # $Revision: 1.8 $
__version__ = '0.1' __version__ = '0.1'
import struct import struct
...@@ -33,6 +33,8 @@ from CommitLog import FullLog ...@@ -33,6 +33,8 @@ from CommitLog import FullLog
UNDOABLE_TRANSACTION = 'Y' UNDOABLE_TRANSACTION = 'Y'
PROTECTED_TRANSACTION = 'N' PROTECTED_TRANSACTION = 'N'
zero = '\0'*8
class InternalInconsistencyError(POSException.POSError, AssertionError): class InternalInconsistencyError(POSException.POSError, AssertionError):
...@@ -127,7 +129,7 @@ class Full(BerkeleyBase): ...@@ -127,7 +129,7 @@ class Full(BerkeleyBase):
# ext is the extra info passed to tpc_finish(). It is a # ext is the extra info passed to tpc_finish(). It is a
# dictionary that we get already pickled by BaseStorage. # dictionary that we get already pickled by BaseStorage.
# #
# txnOids -- {tid -> [oid]} # txnoids -- {tid -> [oid]}
# Maps transaction ids to the oids of the objects modified by the # Maps transaction ids to the oids of the objects modified by the
# transaction. # transaction.
# #
...@@ -147,7 +149,7 @@ class Full(BerkeleyBase): ...@@ -147,7 +149,7 @@ class Full(BerkeleyBase):
self._currentVersions = self._setupDB('currentVersions', db.DB_DUP) self._currentVersions = self._setupDB('currentVersions', db.DB_DUP)
self._metadata = self._setupDB('metadata') self._metadata = self._setupDB('metadata')
self._txnMetadata = self._setupDB('txnMetadata') self._txnMetadata = self._setupDB('txnMetadata')
self._txnOids = self._setupDB('txnOids', db.DB_DUP) self._txnoids = self._setupDB('txnoids', db.DB_DUP)
self._refcounts = self._setupDB('refcounts') self._refcounts = self._setupDB('refcounts')
self._pickleRefcounts = self._setupDB('pickleRefcounts') self._pickleRefcounts = self._setupDB('pickleRefcounts')
# Initialize our cache of the next available version id. # Initialize our cache of the next available version id.
...@@ -168,7 +170,7 @@ class Full(BerkeleyBase): ...@@ -168,7 +170,7 @@ class Full(BerkeleyBase):
self._currentVersions.close() self._currentVersions.close()
self._metadata.close() self._metadata.close()
self._txnMetadata.close() self._txnMetadata.close()
self._txnOids.close() self._txnoids.close()
self._refcounts.close() self._refcounts.close()
self._pickleRefcounts.close() self._pickleRefcounts.close()
BerkeleyBase.close(self) BerkeleyBase.close(self)
...@@ -189,6 +191,7 @@ class Full(BerkeleyBase): ...@@ -189,6 +191,7 @@ class Full(BerkeleyBase):
self._commitlog.promise() self._commitlog.promise()
def _finish(self, tid, u, d, e): def _finish(self, tid, u, d, e):
global zero
# This is called from the storage interface's tpc_finish() method. # This is called from the storage interface's tpc_finish() method.
# Its responsibilities are to finish the transaction with the # Its responsibilities are to finish the transaction with the
# underlying database. # underlying database.
...@@ -212,7 +215,6 @@ class Full(BerkeleyBase): ...@@ -212,7 +215,6 @@ class Full(BerkeleyBase):
# u is the user associated with the transaction # u is the user associated with the transaction
# d is the description of the transaction # d is the description of the transaction
# e is the transaction extension # e is the transaction extension
zero = '\0'*8
txn = self._env.txn_begin() txn = self._env.txn_begin()
try: try:
# Update the transaction metadata # Update the transaction metadata
...@@ -252,25 +254,29 @@ class Full(BerkeleyBase): ...@@ -252,25 +254,29 @@ class Full(BerkeleyBase):
# pickle. # pickle.
self._pickles.put(key, pickle, txn=txn) self._pickles.put(key, pickle, txn=txn)
lrevid = tid lrevid = tid
# Boost the refcount of all the objects referred to by
# this pickle. referencesf() scans a pickle and
# returns the list of objects referenced by the
# pickle. BAW: the signature of referencesf() has
# changed for Zope 2.4, to make it more convenient to
# use. Gotta stick with the backwards compatible
# version for now.
refdoids = []
referencesf(pickle, refdoids)
for roid in refdoids:
refcount = self._refcounts.get(roid, zero, txn=txn)
refcount = utils.p64(utils.U64(refcount) + 1)
self._refcounts.put(roid, refcount, txn=txn)
# Update the metadata table # Update the metadata table
self._metadata.put(key, vid+nvrevid+tid+prevrevid, txn=txn) self._metadata.put(key, vid+nvrevid+tid+prevrevid, txn=txn)
# If we're in a real version, update this table too # If we're in a real version, update this table too. This
# ends up putting multiple copies of the vid/oid records
# in the table, but it's easier to weed those out later
# than to weed them out now.
if vid <> zero: if vid <> zero:
self._currentVersions.put(vid, oid, txn=txn) self._currentVersions.put(vid, oid, txn=txn)
self._serials.put(oid, tid, txn=txn) self._serials.put(oid, tid, txn=txn)
self._txnOids.put(tid, oid, txn=txn) self._txnoids.put(tid, oid, txn=txn)
# Boost the refcount of all the objects referred to by
# this pickle. referencesf() scans a pickle and returns
# the list of objects referenced by the pickle. BAW: the
# signature of referencesf() has changed for Zope 2.4, to
# make it more convenient to use. Gotta stick with the
# backwards compatible version for now.
refdoids = []
referencesf(pickle, refdoids)
for roid in refdoids:
refcount = self._refcounts.get(roid, zero, txn=txn)
refcount = utils.p64(utils.U64(refcount) + 1)
self._refcounts.put(roid, refcount, txn=txn)
# Update the pickle's reference count. Remember, the # Update the pickle's reference count. Remember, the
# refcount is stored as a string, so we have to do the # refcount is stored as a string, so we have to do the
# string->long->string dance. # string->long->string dance.
...@@ -285,7 +291,15 @@ class Full(BerkeleyBase): ...@@ -285,7 +291,15 @@ class Full(BerkeleyBase):
elif op == 'd': elif op == 'd':
# This is a "delete-a-version" record # This is a "delete-a-version" record
vid = data[0] vid = data[0]
self._currentVersions.delete(vid, txn=txn) c = self._currentVersions.cursor(txn=txn)
try:
rec = c.set(vid)
while rec:
c.delete()
rec = c.next()
finally:
c.close()
except: except:
# If any errors whatsoever occurred, abort the transaction with # If any errors whatsoever occurred, abort the transaction with
# Berkeley, leave the commit log file in the PROMISED state (since # Berkeley, leave the commit log file in the PROMISED state (since
...@@ -303,24 +317,23 @@ class Full(BerkeleyBase): ...@@ -303,24 +317,23 @@ class Full(BerkeleyBase):
# #
def abortVersion(self, version, transaction): def abortVersion(self, version, transaction):
global zero
# Abort the version, but retain enough information to make the abort # Abort the version, but retain enough information to make the abort
# undoable. # undoable.
if transaction is not self._transaction: if transaction is not self._transaction:
raise POSException.StorageTransactionError(self, transaction) raise POSException.StorageTransactionError(self, transaction)
zero = '\0'*8
c = None # the currentVersions cursor c = None # the currentVersions cursor
self._lock_acquire() self._lock_acquire()
try: try:
# The transaction id for this abort
tid = self._serial
# Let KeyErrors percolate up. This is how we ensure that the # Let KeyErrors percolate up. This is how we ensure that the
# version we're aborting is not the empty string. # version we're aborting is not the empty string.
vid = self._vids[version] vid = self._vids[version]
# We need to keep track of the oids that are affected by the abort # We need to keep track of the oids that are affected by the abort
# so that we can return it to the connection, which must # so that we can return it to the connection, which must
# invalidate the objects so they can be reloaded. # invalidate the objects so they can be reloaded. We use a set
oids = [] # here because currentVersions may have duplicate vid/oid records.
oids = {}
c = self._currentVersions.cursor() c = self._currentVersions.cursor()
rec = c.set(vid) rec = c.set(vid)
# Now cruise through all the records for this version, looking for # Now cruise through all the records for this version, looking for
...@@ -333,7 +346,7 @@ class Full(BerkeleyBase): ...@@ -333,7 +346,7 @@ class Full(BerkeleyBase):
oid = rec[1] # ignore the key oid = rec[1] # ignore the key
revid = self._serials[oid] revid = self._serials[oid]
meta = self._metadata[oid+revid] meta = self._metadata[oid+revid]
curvid, nvrevid = struct.unpack('8s8s8s', meta[:16]) curvid, nvrevid = struct.unpack('8s8s', meta[:16])
# Make sure that the vid in the metadata record is the same as # Make sure that the vid in the metadata record is the same as
# the vid we sucked out of the vids table, otherwise we've got # the vid we sucked out of the vids table, otherwise we've got
# an internal database inconsistency. # an internal database inconsistency.
...@@ -345,29 +358,31 @@ class Full(BerkeleyBase): ...@@ -345,29 +358,31 @@ class Full(BerkeleyBase):
continue continue
# Get the non-version data for the object # Get the non-version data for the object
nvmeta = self._metadata[oid+nvrevid] nvmeta = self._metadata[oid+nvrevid]
curvid, nvrevid, lrevid = unpack('8s8s8s', nvmeta[:24]) curvid, nvrevid, lrevid = struct.unpack('8s8s8s', nvmeta[:24])
# We expect curvid to be zero because we just got the # We expect curvid to be zero because we just got the
# non-version entry. # non-version entry.
if curvid <> zero: if curvid <> zero:
raise InternalInconsistencyError raise InternalInconsistencyError
# Write the object id, live revision id, and this transaction # Write the object id, live revision id, the current revision
# id (which serves as the previous revid) to the commit log. # id (which serves as the previous revid to this transaction)
self._commitlog.write_nonversion_object(oid, lrevid, tid) # to the commit log.
self._commitlog.write_nonversion_object(oid, lrevid, revid)
# Remember to return the oid... # Remember to return the oid...
oids.append(oid) oids[oid] = 1
# ...and get the next record for this vid # ...and get the next record for this vid
rec = c.next_dup() rec = c.next()
# We've now processed all the objects on the discarded version, so # We've now processed all the objects on the discarded version, so
# write this to the commit log and return the list of oids to # write this to the commit log and return the list of oids to
# invalidate. # invalidate.
self._commitlog.write_discard_version(vid) self._commitlog.write_discard_version(vid)
return oids return oids.keys()
finally: finally:
if c: if c:
c.close() c.close()
self._lock_release() self._lock_release()
def commitVersion(self, src, dest, transaction): def commitVersion(self, src, dest, transaction):
global zero
# Commit a source version `src' to a destination version `dest'. It's # Commit a source version `src' to a destination version `dest'. It's
# perfectly valid to move an object from one version to another. src # perfectly valid to move an object from one version to another. src
# and dest are version strings, and if we're committing to a # and dest are version strings, and if we're committing to a
...@@ -375,7 +390,6 @@ class Full(BerkeleyBase): ...@@ -375,7 +390,6 @@ class Full(BerkeleyBase):
if transaction is not self._transaction: if transaction is not self._transaction:
raise POSException.StorageTransactionError(self, transaction) raise POSException.StorageTransactionError(self, transaction)
zero = '\0'*8
c = None # the currentVersions cursor c = None # the currentVersions cursor
self._lock_acquire() self._lock_acquire()
try: try:
...@@ -414,7 +428,7 @@ class Full(BerkeleyBase): ...@@ -414,7 +428,7 @@ class Full(BerkeleyBase):
# Remember to return the oid... # Remember to return the oid...
oids.append(oid) oids.append(oid)
# ...and get the next record for this vid # ...and get the next record for this vid
rec = c.next_dup() rec = c.next()
# Now that we're done, we can discard this version # Now that we're done, we can discard this version
self._commitlog.write_discard_version(vid) self._commitlog.write_discard_version(vid)
return oids return oids
...@@ -424,6 +438,7 @@ class Full(BerkeleyBase): ...@@ -424,6 +438,7 @@ class Full(BerkeleyBase):
self._lock_release() self._lock_release()
def modifiedInVersion(self, oid): def modifiedInVersion(self, oid):
global zero
# Return the version string of the version that contains the most # Return the version string of the version that contains the most
# recent change to the object. The empty string means the change # recent change to the object. The empty string means the change
# isn't in a version. # isn't in a version.
...@@ -432,7 +447,7 @@ class Full(BerkeleyBase): ...@@ -432,7 +447,7 @@ class Full(BerkeleyBase):
# Let KeyErrors percolate up # Let KeyErrors percolate up
revid = self._serials[oid] revid = self._serials[oid]
vid = self._metadata[oid+revid][:8] vid = self._metadata[oid+revid][:8]
if vid == '\0'*8: if vid == zero:
# Not in a version # Not in a version
return '' return ''
return self._versions[vid] return self._versions[vid]
...@@ -444,6 +459,7 @@ class Full(BerkeleyBase): ...@@ -444,6 +459,7 @@ class Full(BerkeleyBase):
# #
def load(self, oid, version): def load(self, oid, version):
global zero
# BAW: in the face of application level conflict resolution, it's # BAW: in the face of application level conflict resolution, it's
# /possible/ to load an object that is sitting in the commit log. # /possible/ to load an object that is sitting in the commit log.
# That's bogus though because there's no way to know what to return; # That's bogus though because there's no way to know what to return;
...@@ -468,13 +484,15 @@ class Full(BerkeleyBase): ...@@ -468,13 +484,15 @@ class Full(BerkeleyBase):
# object is living in is equal to the version that's being # object is living in is equal to the version that's being
# requested, then we can simply return the pickle referenced by # requested, then we can simply return the pickle referenced by
# the revid. # the revid.
if vid == '\0'*8 or self._versions[vid] == version: if vid == zero or self._versions[vid] == version:
return self._pickles[oid+lrevid], revid return self._pickles[oid+lrevid], revid
# Otherwise, we recognize that an object cannot be stored in more # Otherwise, we recognize that an object cannot be stored in more
# than one version at a time (although this may change if/when # than one version at a time (although this may change if/when
# "Unlocked" versions are added). So we return the non-version # "Unlocked" versions are added). So we return the non-version
# revision of the object. BAW: should we assert that version is # revision of the object. Make sure the version is empty though.
# empty in this case? if version:
raise POSException.VersionError(
'Undefined version: %s' % version)
lrevid = self._metadata[oid+nvrevid][16:24] lrevid = self._metadata[oid+nvrevid][16:24]
return self._pickles[oid+lrevid], nvrevid return self._pickles[oid+lrevid], nvrevid
finally: finally:
...@@ -488,7 +506,7 @@ class Full(BerkeleyBase): ...@@ -488,7 +506,7 @@ class Full(BerkeleyBase):
# corresponding to the oid and the supplied serial # corresponding to the oid and the supplied serial
# a.k.a. revision. # a.k.a. revision.
lrevid = self._metadata[oid+serial][16:24] lrevid = self._metadata[oid+serial][16:24]
return self._pickle[oid+lrevid] return self._pickles[oid+lrevid]
finally: finally:
self._lock_release() self._lock_release()
...@@ -511,10 +529,12 @@ class Full(BerkeleyBase): ...@@ -511,10 +529,12 @@ class Full(BerkeleyBase):
return vid return vid
def store(self, oid, serial, data, version, transaction): def store(self, oid, serial, data, version, transaction):
global zero
# Transaction equivalence guard
if transaction is not self._transaction: if transaction is not self._transaction:
raise POSException.StorageTransactionError(self, transaction) raise POSException.StorageTransactionError(self, transaction)
zero = '\0'*8
self._lock_acquire() self._lock_acquire()
try: try:
# Check for conflict errors. JF says: under some circumstances, # Check for conflict errors. JF says: under some circumstances,
...@@ -567,7 +587,9 @@ class Full(BerkeleyBase): ...@@ -567,7 +587,9 @@ class Full(BerkeleyBase):
# current version. That's a no no. # current version. That's a no no.
raise POSException.VersionLockError( raise POSException.VersionLockError(
'version mismatch for object %s (was: %s, got: %s)' % 'version mismatch for object %s (was: %s, got: %s)' %
map(utils.U64, (oid, ovid, vid))) tuple(map(utils.U64, (oid, ovid, vid))))
else:
nvrevid = onvrevid
# Record the update to this object in the commit log. # Record the update to this object in the commit log.
self._commitlog.write_object(oid, vid, nvrevid, data, oserial) self._commitlog.write_object(oid, vid, nvrevid, data, oserial)
finally: finally:
...@@ -575,191 +597,62 @@ class Full(BerkeleyBase): ...@@ -575,191 +597,62 @@ class Full(BerkeleyBase):
# Return our cached serial number for the object. # Return our cached serial number for the object.
return self._serial return self._serial
def _zaprevision(self, key, txn):
# Delete the metadata record pointed to by the key, decrefing the
# reference counts of the pickle pointed to by this record, and
# perform cascading decrefs on the referenced objects.
#
# We need the lrevid which points to the pickle for this revision...
vid, nvrevid, lrevid = self._metadata.get(key, txn=txn)[16:24]
# ...and now delete the metadata record for this object revision
self._metadata.delete(key, txn=txn)
# Decref the reference count of the pickle pointed to by oid+lrevid.
# If the reference count goes to zero, we can garbage collect the
# pickle, and decref all the objects pointed to by the pickle (with of
# course, cascading garbage collection).
pkey = key[:8] + lrevid
refcount = self._pickleRefcounts.get(pkey, txn=txn)
# It's possible the pickleRefcounts entry for this oid has already
# been deleted by a previous pass of _zaprevision(). If so, we're
# done.
if refcount is None:
return
refcount = utils.U64(refcount) - 1
if refcount > 0:
self._pickleRefcounts.put(pkey, utils.p64(refcount), txn=txn)
return
# The refcount of this pickle has gone to zero, so we need to garbage
# collect it, and decref all the objects it points to.
self._pickleRefcounts.delete(pkey, txn=txn)
pickle = self._pickles.get(pkey, txn=txn)
# Sniff the pickle to get the objects it refers to
collectables = []
refoids = []
referencesf(pickle, oids)
# Now decref the reference counts for each of those objects. If it
# goes to zero, remember the oid so we can recursively zap its
# metadata too.
for oid in refoids:
refcount = self._refcounts.get(oid, txn=txn)
refcount = utils.U64(refcount) - 1
if refcount > 0:
self._refcounts.put(oid, utils.p64(refcount), txn=txn)
else:
collectables.append(oid)
# Now for all objects whose refcounts just went to zero, we want to
# delete any records that pertain to this object. When we get to
# deleting the metadata record, we'll do it recursively so as to
# decref any pickles it points to. For everything else, we'll do it
# in the most efficient manner possible.
tids = []
for oid in collectables:
self._serials.delete(oid, txn=txn)
self._refcounts.delete(oid, txn=txn)
# To delete all the metadata records associated with this object
# id, we use a trick of Berkeley cursor objects to only partially
# specify the key. This works because keys are compared
# lexically, with shorter keys collating before longer keys.
c = self._metadata.cursor()
try:
rec = c.set(oid)
while rec and rec[0][:8] == oid:
# Remember the transaction ids so we can clean up the
# txnOids table below. Note that we don't record the vids
# because now that we don't have destructive undo,
# _zaprevisions() can only be called during a pack() and
# it is impossible to pack current records (and hence
# currentVersions).
tids.append(rec[0][8:]) # second 1/2 of the key
self._zaprevision(rec[0], txn)
rec = c.next()
finally:
c.close()
# Delete all the txnOids entries that referenced this oid
for tid in tids:
c = self._txnOids.cursor(txn=txn)
try:
rec = c.set_both(tid, oid)
while rec:
# Although unlikely, it is possible that an object got
# modified more than once in a transaction.
c.delete()
rec = c.next_dup()
finally:
c.close()
def transactionalUndo(self, tid, transaction): def transactionalUndo(self, tid, transaction):
if transaction is not self._transaction: if transaction is not self._transaction:
raise POSException.StorageTransactionError(self, transaction) raise POSException.StorageTransactionError(self, transaction)
oids = [] newrevs = []
self._lock_acquire() c = None
try:
return oids
finally:
self._lock_release()
# REMOVE ME -- DON'T IMPLEMENT UNDO SINCE WE'RE GOING TO IMPLEMENT
# transactionalUndo() INSTEAD
def undo(self, tid):
# Attempt to undo transaction. NOTE: the current storage interface
# documentation says that this method takes a third argument, which is
# a limit on the number of oids to return. JF says, "Let's get rid of
# the third argument."
c = None # txnOids cursor
oids = []
zero = '\0'*8
self._lock_acquire() self._lock_acquire()
txn = self._env.txn_begin()
try: try:
# Make sure the transaction is undoable. If this transaction # First, make sure the transaction isn't protected by a pack
# occurred earlier than a pack operation, it is no longer status = self._txnMetadata[tid][1]
# undoable. The status flag indicates its undoability.
status = self._txnMetadata.get(tid, txn=txn)[1]
if status == PROTECTED_TRANSACTION: if status == PROTECTED_TRANSACTION:
raise POSException.UndoError, 'Transaction cannot be undone' raise POSException.UndoError, 'Transaction cannot be undone'
# Create the cursor and begin the transaction
c = self._txnOids.cursor()
try:
rec = c.set(tid)
while rec:
oid = rec[1]
oids.append(oid)
# Make sure the tid is current
if self._serials.get(oid, txn=txn) <> tid:
# BAW: we can only undo the most current revision of
# the object???
raise POSException.UndoError(
"Not object's current revision")
# Get rid of the metadata record for this object revision
# and perform cascading decrefs
self._zaprevision(oid, tid, txn)
key = oid + tid # Calculate all the oids modified in the transaction
# Get the metadata for this object revision, and then c = self._txnoids.cursor()
# delete the metadata record. rec = c.set(tid)
vid, nvrevid, lrevid, prevrevid = struct.unpack( while rec:
'8s8s8s8s', self._metadata.get(key, txn=txn)) oid = rec[1]
# Delete the metadata record for this object revision # In order to be able to undo this transaction, we must be
self._metadata.delete(key, txn=txn) # undoing either the current revision of the object, or we
# Decref the reference count of the pickle that we're # must be restoring the exact same pickle (identity compared)
# pointing to and garbage collect it if the refcount falls # that would be restored if we were undoing the current
# to zero. # revision.
self._decref(oid, lrevid, txn) revid = self._serials[oid]
# If the prevrevid is zero, then we've just undone the if revid == tid:
# creation of this object, so we can get rid of its prevrevid = self._metadata[oid+tid][24:]
# serials record. Otherwise, update the serials record to newrevs.append((oid, self._metadata[oid+prevrevid]))
# point to the previous revision of the object. else:
if prevrevid == zero: # Compare the lrevid (pickle pointers) for the current
self._serials.delete(oid, txn=txn) # revision of the object and the revision previous to the
else: # one we're undoing.
self._serials.put(oid, prevrevid, txn=txn) lrevid = self._metadata[oid+revid][16:24]
prec = self._metadata.get(oid+prevrevid, txn=txn) # When we undo this transaction, the previous record will
# BAW: what does it mean if the metadata for the # become the current record.
# previous revision of the object doesn't exist??? prevrevid = self._metadata[oid+tid][24:]
if not prec: # And here's the pickle pointer for that potentially
raise POSException.UndoError( # soon-to-be current record
"No previous revision for object") prevrec = self._metadata[oid+prevrevid]
pvid = prec[:8] if lrevid <> prevrec[16:24]:
# If the version for the previous revision of the # They aren't the same, so we cannot undo this txn
# object is different than the current revision of the raise POSException.UndoError, 'Cannot undo transaction'
# object, then we're undoing past a version creation, newrevs.append((oid, prevrec))
# so we can delete the entry for this vid/oid pair in # Check the next txnoid record
# the currentVersions table. rec = c.next()
if pvid <> vid: # Okay, we've checked all the oids affected by the transaction
# Don't delete the non-version revision of the # we're about to undo, and everything looks good. So now we'll
# object. # write to the log the new object records we intend to commit.
if vid <> zero: c.close()
tmpc = self._currentVersions.cursor(txn=txn) c = None
try: oids = []
rec = tmpc.get_both(vid, oid) for oid, rec in newrevs:
if rec: vid, nvrevid, lrevid, prevrevid = struct.unpack(
tmpc.delete() '8s8s8s8s', rec)
finally: self._commitlog.write_moved_object(oid, vid, nvrevid, lrevid,
tmpc.close() prevrevid)
if pvid <> zero: oids.append(oid)
# Make the previous version the current one
self._currentVersions.put(pvid, oid, txn=txn)
# Finally, delete the transaction metadata associated with
# the transaction we just undid.
self._txnMetadata.delete(tid, txn=txn)
self._txnOids.delete(tid, txn=txn)
except:
txn.abort()
raise
else:
txn.commit()
return oids return oids
finally: finally:
if c: if c:
...@@ -832,13 +725,24 @@ class Full(BerkeleyBase): ...@@ -832,13 +725,24 @@ class Full(BerkeleyBase):
# Return true if version is empty. # Return true if version is empty.
self._lock_acquire() self._lock_acquire()
try: try:
# Let these KeyError exceptions percolate up # First, check if we're querying the empty (i.e. non) version
vid = self._vids[version] if not version:
# But catch these, because it means the version is empty c = self._serials.cursor()
if self._currentVersions.has_key(vid): try:
rec = c.first()
return not rec
finally:
c.close()
# If the named version doesn't exist or there are no objects in
# the version, then return true.
missing = []
vid = self._vids.get(version, missing)
if vid is missing:
return 1 return 1
else: if self._currentVersions.has_key(vid):
return 0 return 0
else:
return 1
finally: finally:
self._lock_release() self._lock_release()
...@@ -875,6 +779,88 @@ class Full(BerkeleyBase): ...@@ -875,6 +779,88 @@ class Full(BerkeleyBase):
tid=self._current[oid] tid=self._current[oid]
finally: self._lock_release() finally: self._lock_release()
def _zaprevision(self, key, txn):
# Delete the metadata record pointed to by the key, decrefing the
# reference counts of the pickle pointed to by this record, and
# perform cascading decrefs on the referenced objects.
#
# We need the lrevid which points to the pickle for this revision...
vid, nvrevid, lrevid = self._metadata.get(key, txn=txn)[16:24]
# ...and now delete the metadata record for this object revision
self._metadata.delete(key, txn=txn)
# Decref the reference count of the pickle pointed to by oid+lrevid.
# If the reference count goes to zero, we can garbage collect the
# pickle, and decref all the objects pointed to by the pickle (with of
# course, cascading garbage collection).
pkey = key[:8] + lrevid
refcount = self._pickleRefcounts.get(pkey, txn=txn)
# It's possible the pickleRefcounts entry for this oid has already
# been deleted by a previous pass of _zaprevision(). If so, we're
# done.
if refcount is None:
return
refcount = utils.U64(refcount) - 1
if refcount > 0:
self._pickleRefcounts.put(pkey, utils.p64(refcount), txn=txn)
return
# The refcount of this pickle has gone to zero, so we need to garbage
# collect it, and decref all the objects it points to.
self._pickleRefcounts.delete(pkey, txn=txn)
pickle = self._pickles.get(pkey, txn=txn)
# Sniff the pickle to get the objects it refers to
collectables = []
refoids = []
referencesf(pickle, oids)
# Now decref the reference counts for each of those objects. If it
# goes to zero, remember the oid so we can recursively zap its
# metadata too.
for oid in refoids:
refcount = self._refcounts.get(oid, txn=txn)
refcount = utils.U64(refcount) - 1
if refcount > 0:
self._refcounts.put(oid, utils.p64(refcount), txn=txn)
else:
collectables.append(oid)
# Now for all objects whose refcounts just went to zero, we want to
# delete any records that pertain to this object. When we get to
# deleting the metadata record, we'll do it recursively so as to
# decref any pickles it points to. For everything else, we'll do it
# in the most efficient manner possible.
tids = []
for oid in collectables:
self._serials.delete(oid, txn=txn)
self._refcounts.delete(oid, txn=txn)
# To delete all the metadata records associated with this object
# id, we use a trick of Berkeley cursor objects to only partially
# specify the key. This works because keys are compared
# lexically, with shorter keys collating before longer keys.
c = self._metadata.cursor()
try:
rec = c.set(oid)
while rec and rec[0][:8] == oid:
# Remember the transaction ids so we can clean up the
# txnoids table below. Note that we don't record the vids
# because now that we don't have destructive undo,
# _zaprevisions() can only be called during a pack() and
# it is impossible to pack current records (and hence
# currentVersions).
tids.append(rec[0][8:]) # second 1/2 of the key
self._zaprevision(rec[0], txn)
rec = c.next()
finally:
c.close()
# Delete all the txnoids entries that referenced this oid
for tid in tids:
c = self._txnoids.cursor(txn=txn)
try:
rec = c.set_both(tid, oid)
while rec:
# Although unlikely, it is possible that an object got
# modified more than once in a transaction.
c.delete()
rec = c.next_dup()
finally:
c.close()
def pack(self, t, referencesf): def pack(self, t, referencesf):
# FIXME # FIXME
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment