Commit 27f80e8e authored by Barry Warsaw's avatar Barry Warsaw

A new algorithm for packing which seems much more straightforward.

Here's how it works:

- On every store(), we write an entry to a objrev table containing the
  tuple of information (newserial, oid, oldserial).  We don't write
  this entry if the store is the first revision of an object on a new
  version.

  We do basically the same thing on restore() and transactionalUndo().

- On an abortVersion(), we write two entries to the objrev table, one
  that has (newserial, oid, oldserial) -- which points to the old
  serial in the version, and (newserial, oid, nvserial) -- which
  points to the non-version revision of the version revision.

- On commitVersion(), we do the same as abortVersion() except that we
  don't write the non-version data if we're committing to a different
  version.

- Now, when we pack, all we need to do is cruise from the beginning of
  the objrev table until we find an entry with a newserial > packtime.
  If the oldserial is ZERO, it's an object creation event which we
  don't need to worry about because there's no previous revision.  But
  otherwise, we can delete the oid+oldserial revision because we know
  it's not current.  We do this, updating pickle refcounts and then
  collecting any objects that are left unreferenced.

The cute thing is that autopacking will use the same algorithm.  The
main difference between autopack and classic pack, is that the latter
does a mark and sweep garbage collection phase after the normal objrev
collection phase.  Also, this algorithm means autopack needs only
three pieces of information:

- How often the thread should run (e.g. once per hour)

- How far in the past it should pack (e.g. pack to 4 hours ago).  We
  don't need a start time for the autopack window, because we'll
  always just start at the beginning of the objrev table.

- How often should autopack also do a classic pack (e.g. do a classic
  pack once per day).

Autopack isn't implemented in this checkin, but I believe it will be
nearly trivial to add.  That comes next.
parent 6108f14f
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
"""Berkeley storage with full undo and versioning support. """Berkeley storage with full undo and versioning support.
""" """
__version__ = '$Revision: 1.46 $'.split()[-2:][0] __version__ = '$Revision: 1.47 $'.split()[-2:][0]
import sys import sys
import time import time
...@@ -24,7 +24,7 @@ from struct import pack, unpack ...@@ -24,7 +24,7 @@ from struct import pack, unpack
# This uses the Dunn/Kuchling PyBSDDB v3 extension module available from # This uses the Dunn/Kuchling PyBSDDB v3 extension module available from
# http://pybsddb.sourceforge.net. It is compatible with release 3.4 of # http://pybsddb.sourceforge.net. It is compatible with release 3.4 of
# PyBSDDB3. # PyBSDDB3. The only recommended version of BerkeleyDB is 4.0.14.
from bsddb3 import db from bsddb3 import db
from ZODB import POSException from ZODB import POSException
...@@ -41,21 +41,15 @@ import ThreadLock ...@@ -41,21 +41,15 @@ import ThreadLock
# functionality. # functionality.
from BerkeleyBase import BerkeleyBase from BerkeleyBase import BerkeleyBase
# Flags for transaction status in the transaction metadata table. You can
# only undo back to the last pack, and any transactions before the pack time
# get marked with the PROTECTED_TRANSACTION flag. An attempt to undo past a
# PROTECTED_TRANSACTION will raise an POSException.UndoError. By default,
# transactions are marked with the UNDOABLE_TRANSACTION status flag.
UNDOABLE_TRANSACTION = 'Y'
PROTECTED_TRANSACTION = 'N'
ABORT = 'A' ABORT = 'A'
COMMIT = 'C' COMMIT = 'C'
PRESENT = 'X' PRESENT = 'X'
ZERO = '\0'*8 ZERO = '\0'*8
# Special flag for uncreated objects (i.e. Does Not Exist)
DNE = '\377'*8 DNE = '\377'*8
# DEBUGGING # DEBUGGING
#DNE = 'nonexist' # does not exist #DNE = 'nonexist'
try: try:
# Python 2.2 # Python 2.2
...@@ -91,7 +85,8 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -91,7 +85,8 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
# #
# - Object ids (oid) are 8-bytes # - Object ids (oid) are 8-bytes
# - Objects have revisions, with each revision being identified by a # - Objects have revisions, with each revision being identified by a
# unique serial number. # unique serial number. We sometimes refer to 16-byte strings of
# oid+serial as a revision id.
# - Transaction ids (tid) are 8-bytes # - Transaction ids (tid) are 8-bytes
# - Version ids (vid) are 8-bytes # - Version ids (vid) are 8-bytes
# - Data pickles are of arbitrary length # - Data pickles are of arbitrary length
...@@ -138,16 +133,9 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -138,16 +133,9 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
# prevrevid is the tid pointing to the previous state of the # prevrevid is the tid pointing to the previous state of the
# object. This is used for undo. # object. This is used for undo.
# #
# txnMetadata -- {tid -> status+userlen+desclen+user+desc+ext} # txnMetadata -- {tid -> userlen+desclen+user+desc+ext}
# Maps tids to metadata about a transaction. # Maps tids to metadata about a transaction.
# #
# Status is a 1-character status flag, which is used by the undo
# mechanism, and has the following values (see constants above):
# 'N' -- This transaction is "pack protected". You can only
# undo back to the last pack, and any transactions
# before the pack time get marked with this flag.
# 'Y' -- It is okay to undo past this transaction.
#
# userlen is the length in characters of the `user' field as an # userlen is the length in characters of the `user' field as an
# 8-byte unsigned long integer # 8-byte unsigned long integer
# desclen is the length in characters of the `desc' field as an # desclen is the length in characters of the `desc' field as an
...@@ -210,8 +198,21 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -210,8 +198,21 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
# recovery/restart, all pending data should be committed. Outside # recovery/restart, all pending data should be committed. Outside
# of any transaction (e.g. before the tpc_begin()), there will be # of any transaction (e.g. before the tpc_begin()), there will be
# no pending entry. It is a database invariant that if the # no pending entry. It is a database invariant that if the
# pending table is empty, the oids and pvids tables must also be # pending table is empty, the oids, pvids, and prevrevids tables
# empty. # must also be empty.
#
# packtime -- tid
# The time of the last pack. It is illegal to undo to before the
# last pack time.
#
# objrevs -- {newserial+oid -> oldserial}
# This table collects object revision information for packing
# purposes. Every time a new object revision is committed, we
# write an entry to this table. When we run pack, we iterate from
# the start of this table until newserial > packtime, deleting old
# revisions of objects. Note that when a new revision of an
# object is first written to a version, no entry is written here.
# We do write an entry when we commit or abort the version.
# #
# packmark -- [oid] # packmark -- [oid]
# Every object reachable from the root during a classic pack # Every object reachable from the root during a classic pack
...@@ -220,10 +221,9 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -220,10 +221,9 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
# oidqueue -- [oid] # oidqueue -- [oid]
# This table is a Queue, not a BTree. It is used during the mark # This table is a Queue, not a BTree. It is used during the mark
# phase of pack() and contains a list of oids for work to be done. # phase of pack() and contains a list of oids for work to be done.
# # It is also used during pack to list objects for which no more
# zaptids -- [tid] # references exist, such that the objects can be completely packed
# This is another queue written during the sweep phase to collect # away.
# transaction ids that can be packed away.
# #
self._serials = self._setupDB('serials', db.DB_DUP) self._serials = self._setupDB('serials', db.DB_DUP)
self._pickles = self._setupDB('pickles') self._pickles = self._setupDB('pickles')
...@@ -241,22 +241,20 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -241,22 +241,20 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
self._txnMetadata = self._setupDB('txnMetadata') self._txnMetadata = self._setupDB('txnMetadata')
self._txnoids = self._setupDB('txnoids', db.DB_DUP) self._txnoids = self._setupDB('txnoids', db.DB_DUP)
self._pickleRefcounts = self._setupDB('pickleRefcounts') self._pickleRefcounts = self._setupDB('pickleRefcounts')
# Table to support packing. # Tables to support packing.
self._objrevs = self._setupDB('objrevs', db.DB_DUP)
self._packmark = self._setupDB('packmark') self._packmark = self._setupDB('packmark')
self._packtime = self._setupDB('packtime')
self._oidqueue = db.DB(self._env) self._oidqueue = db.DB(self._env)
self._oidqueue.set_re_len(8) self._oidqueue.set_re_len(8)
# BAW: do we need to set the queue extent size? # BAW: do we need to set the queue extent size?
self._oidqueue.open(self._prefix + 'oidqueue', self._oidqueue.open(self._prefix + 'oidqueue',
db.DB_QUEUE, db.DB_CREATE) db.DB_QUEUE, db.DB_CREATE)
self._zaptids = db.DB(self._env) # Do recovery and consistency checks
self._zaptids.set_re_len(8) self._withlock(self._dorecovery)
self._zaptids.open(self._prefix + 'zaptids',
db.DB_QUEUE, db.DB_CREATE)
# DEBUGGING # DEBUGGING
#self._nextserial = 0L #self._nextserial = 0L
# END DEBUGGING # END DEBUGGING
# Do recovery and consistency checks
self._withlock(self._dorecovery)
def _dorecovery(self): def _dorecovery(self):
# If these tables are non-empty, it means we crashed during a pack # If these tables are non-empty, it means we crashed during a pack
...@@ -264,7 +262,6 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -264,7 +262,6 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
# pack operation will reproduce it faithfully. # pack operation will reproduce it faithfully.
self._oidqueue.truncate() self._oidqueue.truncate()
self._packmark.truncate() self._packmark.truncate()
self._zaptids.truncate()
# The pendings table may have entries if we crashed before we could # The pendings table may have entries if we crashed before we could
# abort or commit the outstanding ZODB transaction. # abort or commit the outstanding ZODB transaction.
pendings = self._pending.keys() pendings = self._pending.keys()
...@@ -272,6 +269,7 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -272,6 +269,7 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
if len(pendings) == 0: if len(pendings) == 0:
assert len(self._oids) == 0 assert len(self._oids) == 0
assert len(self._pvids) == 0 assert len(self._pvids) == 0
assert len(self._prevrevids) == 0
else: else:
# Do recovery # Do recovery
tid = pendings[0] tid = pendings[0]
...@@ -306,9 +304,10 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -306,9 +304,10 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
self._txnMetadata.close() self._txnMetadata.close()
self._txnoids.close() self._txnoids.close()
self._pickleRefcounts.close() self._pickleRefcounts.close()
self._objrevs.close()
self._packtime.close()
self._packmark.close() self._packmark.close()
self._oidqueue.close() self._oidqueue.close()
self._zaptids.close()
BerkeleyBase.close(self) BerkeleyBase.close(self)
def _withtxn(self, meth, *args, **kws): def _withtxn(self, meth, *args, **kws):
...@@ -333,6 +332,7 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -333,6 +332,7 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
cs = self._serials.cursor(txn=txn) cs = self._serials.cursor(txn=txn)
ct = self._txnoids.cursor(txn=txn) ct = self._txnoids.cursor(txn=txn)
cv = self._currentVersions.cursor(txn=txn) cv = self._currentVersions.cursor(txn=txn)
cr = self._objrevs.cursor(txn=txn)
rec = co.first() rec = co.first()
while rec: while rec:
oid = rec[0] oid = rec[0]
...@@ -354,6 +354,13 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -354,6 +354,13 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
vid = self._metadata[revid][:8] vid = self._metadata[revid][:8]
self._metadata.delete(revid, txn=txn) self._metadata.delete(revid, txn=txn)
self._pickles.delete(revid, txn=txn) self._pickles.delete(revid, txn=txn)
# Clean up the object revisions table
try:
cr.set(oid+tid)
except db.DBNotFoundError:
pass
else:
cr.delete()
# Now we have to clean up the currentVersions table # Now we have to clean up the currentVersions table
try: try:
cv.set_both(vid, revid) cv.set_both(vid, revid)
...@@ -368,6 +375,7 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -368,6 +375,7 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
if co: co.close() if co: co.close()
if cs: cs.close() if cs: cs.close()
if cv: cv.close() if cv: cv.close()
if cr: cr.close()
# Now clean up the vids and versions tables # Now clean up the vids and versions tables
cv = self._pvids.cursor(txn=txn) cv = self._pvids.cursor(txn=txn)
try: try:
...@@ -393,6 +401,7 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -393,6 +401,7 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
# Nothing to abort # Nothing to abort
assert len(self._oids) == 0 assert len(self._oids) == 0
assert len(self._pvids) == 0 assert len(self._pvids) == 0
assert len(self._prevrevids) == 0
return return
assert len(pendings) == 1 assert len(pendings) == 1
tid = pendings[0] tid = pendings[0]
...@@ -468,7 +477,7 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -468,7 +477,7 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
userlen = len(u) userlen = len(u)
desclen = len(d) desclen = len(d)
lengths = pack('>II', userlen, desclen) lengths = pack('>II', userlen, desclen)
data = UNDOABLE_TRANSACTION + lengths + u + d + e data = lengths + u + d + e
self._pending.put(tid, ABORT, txn=txn) self._pending.put(tid, ABORT, txn=txn)
self._txnMetadata.put(tid, data, txn=txn) self._txnMetadata.put(tid, data, txn=txn)
...@@ -489,7 +498,7 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -489,7 +498,7 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
def _dostore(self, txn, oid, serial, data, version): def _dostore(self, txn, oid, serial, data, version):
conflictresolved = False conflictresolved = False
vid = nvrevid = ZERO vid = nvrevid = ovid = ZERO
# Check for conflict errors. JF says: under some circumstances, # Check for conflict errors. JF says: under some circumstances,
# it is possible that we'll get two stores for the same object in # it is possible that we'll get two stores for the same object in
# a single transaction. It's not clear though under what # a single transaction. It's not clear though under what
...@@ -547,12 +556,18 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -547,12 +556,18 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
self._pickles.put(revid, data, txn=txn) self._pickles.put(revid, data, txn=txn)
self._metadata.put(revid, vid+nvrevid+newserial+oserial, txn=txn) self._metadata.put(revid, vid+nvrevid+newserial+oserial, txn=txn)
self._txnoids.put(newserial, oid, txn=txn) self._txnoids.put(newserial, oid, txn=txn)
# Update the object revisions table, but only if this store isn't
# the first one of this object in a new version.
if not version or ovid <> ZERO:
self._objrevs.put(newserial+oid, oserial, txn=txn)
# Update the log tables # Update the log tables
self._oids.put(oid, PRESENT, txn=txn) self._oids.put(oid, PRESENT, txn=txn)
if vid <> ZERO: if vid <> ZERO:
self._currentVersions.put(vid, revid, txn=txn) self._currentVersions.put(vid, revid, txn=txn)
self._pvids.put(vid, PRESENT, txn=txn) self._pvids.put(vid, PRESENT, txn=txn)
# And return the new serial number # And return the new serial number
if conflictresolved:
return ResolvedSerial
return newserial return newserial
def store(self, oid, serial, data, version, transaction): def store(self, oid, serial, data, version, transaction):
...@@ -567,7 +582,7 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -567,7 +582,7 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
def _dorestore(self, txn, oid, serial, data, version, prev_txn): def _dorestore(self, txn, oid, serial, data, version, prev_txn):
tid = self._serial tid = self._serial
vid = nvrevid = ZERO vid = nvrevid = ovid = ZERO
prevrevid = prev_txn prevrevid = prev_txn
# self._serial contains the transaction id as set by # self._serial contains the transaction id as set by
# BaseStorage.tpc_begin(). # BaseStorage.tpc_begin().
...@@ -622,6 +637,10 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -622,6 +637,10 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
self._oids.put(oid, PRESENT, txn=txn) self._oids.put(oid, PRESENT, txn=txn)
if vid <> ZERO: if vid <> ZERO:
self._currentVersions.put(vid, revid, txn=txn) self._currentVersions.put(vid, revid, txn=txn)
# Update the object revisions table, but only if this store isn't
# the first one of this object in a new version.
if not version or ovid <> ZERO:
self._objrevs.put(tid+oid, prevrevid, txn=txn)
def restore(self, oid, serial, data, version, prev_txn, transaction): def restore(self, oid, serial, data, version, prev_txn, transaction):
# A lot like store() but without all the consistency checks. This # A lot like store() but without all the consistency checks. This
...@@ -719,11 +738,19 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -719,11 +738,19 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
# while the transaction id is the current transaction. This # while the transaction id is the current transaction. This
# is the one case where serial <> tid, and a special record # is the one case where serial <> tid, and a special record
# must be written to the serials table for this. # must be written to the serials table for this.
self._serials.put(oid, nvrevid+self._serial, txn=txn) newserial = self._serial
self._metadata.put(oid+self._serial, ZERO+ZERO+lrevid+tid, self._serials.put(oid, nvrevid+newserial, txn=txn)
self._metadata.put(oid+newserial, ZERO+ZERO+lrevid+tid,
txn=txn) txn=txn)
self._txnoids.put(self._serial, oid, txn=txn) self._txnoids.put(newserial, oid, txn=txn)
self._oids.put(oid, PRESENT, txn=txn) self._oids.put(oid, PRESENT, txn=txn)
# Now we need to write two records to the object revisions
# table. First, it's the record containing the previous
# serial number, and then it's a record containing the
# non-version serial number (but make sure the object wasn't
# created in the version).
self._objrevs.put(newserial+oid, tid, txn=txn)
self._objrevs.put(newserial+oid, nvrevid, txn=txn)
c.delete() c.delete()
rec = c.next() rec = c.next()
# XXX Should we garbage collect vids and versions? Doing so might # XXX Should we garbage collect vids and versions? Doing so might
...@@ -792,11 +819,20 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -792,11 +819,20 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
# source version. # source version.
if not dest: if not dest:
nvrevid = ZERO nvrevid = ZERO
self._serials.put(oid, self._serial, txn=txn) newserial = self._serial
self._metadata.put(oid+self._serial, dvid+nvrevid+lrevid+tid, self._serials.put(oid, newserial, txn=txn)
self._metadata.put(oid+newserial, dvid+nvrevid+lrevid+tid,
txn=txn) txn=txn)
self._txnoids.put(self._serial, oid, txn=txn) self._txnoids.put(newserial, oid, txn=txn)
self._oids.put(oid, PRESENT, txn=txn) self._oids.put(oid, PRESENT, txn=txn)
# Now we need to write two records to the object revisions
# table. First, it's the record containing the previous
# serial number, and then it's a record containing the
# non-version serial number. However, if we're committing to
# a different version, don't write the second record.
self._objrevs.put(newserial+oid, tid, txn=txn)
if not dest:
self._objrevs.put(newserial+oid, nvrevid, txn=txn)
c.delete() c.delete()
rec = c.next() rec = c.next()
return rtnoids.keys() return rtnoids.keys()
...@@ -1017,6 +1053,15 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -1017,6 +1053,15 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
finally: finally:
self._lock_release() self._lock_release()
def _last_packtime(self):
packtimes = self._packtime.keys()
if len(packtimes) == 1:
return packtimes[0]
elif len(packtimes) == 0:
return ZERO
else:
assert False, 'too many packtimes'
# #
# Transactional undo # Transactional undo
# #
...@@ -1030,7 +1075,7 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -1030,7 +1075,7 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
# previous revision of this object. # previous revision of this object.
mdc = self._metadata.cursor() mdc = self._metadata.cursor()
try: try:
trec = mdc.set(oid+ctid) mdc.set(oid+ctid)
mrec = mdc.prev() mrec = mdc.prev()
if not mrec or mrec[0][:8] <> oid: if not mrec or mrec[0][:8] <> oid:
# The previous transaction metadata record doesn't point to # The previous transaction metadata record doesn't point to
...@@ -1091,12 +1136,11 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -1091,12 +1136,11 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
def _dotxnundo(self, txn, tid): def _dotxnundo(self, txn, tid):
# First, make sure the transaction isn't protected by a pack. # First, make sure the transaction isn't protected by a pack.
status = self._txnMetadata[tid][0] packtime = self._last_packtime()
if status <> UNDOABLE_TRANSACTION: if tid <= packtime:
raise POSException.UndoError, 'Transaction cannot be undone' raise POSException.UndoError, 'Transaction cannot be undone'
# Calculate all the oids of objects modified in this transaction # Calculate all the oids of objects modified in this transaction
newrevs = [] newrevs = []
newstates = []
c = self._txnoids.cursor(txn=txn) c = self._txnoids.cursor(txn=txn)
try: try:
rec = c.set(tid) rec = c.set(tid)
...@@ -1121,6 +1165,7 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -1121,6 +1165,7 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
# new metadata records (and potentially new pickle records). # new metadata records (and potentially new pickle records).
rtnoids = {} rtnoids = {}
for oid, metadata, data in newrevs: for oid, metadata, data in newrevs:
newserial = self._serial
revid = oid + self._serial revid = oid + self._serial
# If the data pickle is None, then this undo is simply # If the data pickle is None, then this undo is simply
# re-using a pickle stored earlier. All we need to do then is # re-using a pickle stored earlier. All we need to do then is
...@@ -1130,23 +1175,25 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -1130,23 +1175,25 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
vid, nvrevid, ign, prevrevid = unpack('>8s8s8s8s', metadata) vid, nvrevid, ign, prevrevid = unpack('>8s8s8s8s', metadata)
if data is not None: if data is not None:
self._pickles.put(revid, data, txn=txn) self._pickles.put(revid, data, txn=txn)
metadata = vid+nvrevid+self._serial+prevrevid metadata = vid+nvrevid+newserial+prevrevid
# We need to write all the new records for an object changing in # We need to write all the new records for an object changing in
# this transaction. Note that we only write to th serials table # this transaction. Note that we only write to th serials table
# if prevrevids hasn't already seen this object, otherwise we'll # if prevrevids hasn't already seen this object, otherwise we'll
# end up with multiple entries in the serials table for the same # end up with multiple entries in the serials table for the same
# tid. # tid.
if not self._prevrevids.has_key(oid): if not self._prevrevids.has_key(oid):
self._serials.put(oid, self._serial, txn=txn) self._serials.put(oid, newserial, txn=txn)
self._metadata.put(revid, metadata, txn=txn) self._metadata.put(revid, metadata, txn=txn)
# Only add this oid to txnoids once # Only add this oid to txnoids once
if not rtnoids.has_key(oid): if not rtnoids.has_key(oid):
self._prevrevids.put(oid, prevrevid, txn=txn) self._prevrevids.put(oid, prevrevid, txn=txn)
self._txnoids.put(self._serial, oid, txn=txn) self._txnoids.put(newserial, oid, txn=txn)
if vid <> ZERO: if vid <> ZERO:
self._currentVersions.put(oid, vid, txn=txn) self._currentVersions.put(oid, vid, txn=txn)
self._oids.put(oid, PRESENT, txn=txn) self._oids.put(oid, PRESENT, txn=txn)
rtnoids[oid] = 1 rtnoids[oid] = 1
# Add this object revision to the autopack table
self._objrevs.put(newserial+oid, prevrevid, txn=txn)
return rtnoids.keys() return rtnoids.keys()
def transactionalUndo(self, tid, transaction): def transactionalUndo(self, tid, transaction):
...@@ -1159,6 +1206,8 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -1159,6 +1206,8 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
self._lock_release() self._lock_release()
def _doundolog(self, first, last, filter): def _doundolog(self, first, last, filter):
# Get the last packtime
packtime = self._last_packtime()
i = 0 # first <= i < last i = 0 # first <= i < last
txnDescriptions = [] # the return value txnDescriptions = [] # the return value
c = self._txnMetadata.cursor() c = self._txnMetadata.cursor()
...@@ -1171,13 +1220,12 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -1171,13 +1220,12 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
while rec and i < last: while rec and i < last:
tid, txnmeta = rec tid, txnmeta = rec
rec = c.prev() rec = c.prev()
status = txnmeta[0] if tid <= packtime:
if status == PROTECTED_TRANSACTION:
break break
userlen, desclen = unpack('>II', txnmeta[1:9]) userlen, desclen = unpack('>II', txnmeta[:8])
user = txnmeta[9:9+userlen] user = txnmeta[8:8+userlen]
desc = txnmeta[9+userlen:9+userlen+desclen] desc = txnmeta[8+userlen:8+userlen+desclen]
ext = txnmeta[9+userlen+desclen:] ext = txnmeta[8+userlen+desclen:]
# Create a dictionary for the TransactionDescription # Create a dictionary for the TransactionDescription
txndesc = {'id' : tid, txndesc = {'id' : tid,
'time' : TimeStamp(tid).timeTime(), 'time' : TimeStamp(tid).timeTime(),
...@@ -1244,9 +1292,9 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -1244,9 +1292,9 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
# ...while other information comes out of the transaction # ...while other information comes out of the transaction
# metadata. # metadata.
txnmeta = self._txnMetadata[tid] txnmeta = self._txnMetadata[tid]
userlen, desclen = unpack('>II', txnmeta[1:9]) userlen, desclen = unpack('>II', txnmeta[:8])
user = txnmeta[9:9+userlen] user = txnmeta[8:8+userlen]
desc = txnmeta[9+userlen:9+userlen+desclen] desc = txnmeta[8+userlen:8+userlen+desclen]
# Now get the pickle size # Now get the pickle size
data = self._pickles[oid+lrevid] data = self._pickles[oid+lrevid]
# Create a HistoryEntry structure, which turns out to be a # Create a HistoryEntry structure, which turns out to be a
...@@ -1276,19 +1324,35 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -1276,19 +1324,35 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
self._lock_release() self._lock_release()
# #
# Packing. # Packing
#
# There are two types of pack operations, the classic pack and the
# autopack. Autopack's sole job is to periodically delete non-current
# object revisions. It runs in a thread and has an `autopack time' which
# is essentially just a time in the past at which to autopack to. For
# example, you might set up autopack to run once per hour, packing away
# all revisions that are older than 4 hours. Autopack can also be
# configured to periodically do a classic pack.
# #
# There are two types of pack operations, the classic pack and autopack. # Classic pack is like autopack -- it packs away old revisions -- but it
# Classic pack is the full blown mark and sweep operation, removing all # also does a mark and sweep through all the known objects, looking for
# revisions of all objects not reachable from the root. This can take a # those that are not root reachable as of the pack time. Such objects are
# long time, although the implementation attempts to mitigate both in-core # also packed away even if they have current revisions in the packable
# memory usage and blocking other, non-packing operations. # transactions, because it means that there is no undo operation that can
# restore the object's reachability. Remember that you cannot undo
# previous to the latest pack time.
# #
# Autopack is a more lightweight operation. It only removes non-current # Both packing strategies do reference counting, and the refcounts are
# revisions in a window of transactions, and doesn't do a root # sums of the refcounts of all revisions, so if an object's refcount goes
# reachability test. # to zero, all its object revisions can safely be packed away.
#
# We try to interleave BerkeleyDB transactions and non-pack-lock
# acquisition as granularly as possible so that packing doesn't block
# other operations for too long. But remember we don't use Berkeley locks
# so we have to be careful about our application level locks.
# #
# First, the public API for classic pack
def pack(self, t, zreferencesf): def pack(self, t, zreferencesf):
# For all intents and purposes, referencesf here is always going to be # For all intents and purposes, referencesf here is always going to be
# the same as ZODB.referencesf.referencesf. It's too much of a PITA # the same as ZODB.referencesf.referencesf. It's too much of a PITA
...@@ -1301,9 +1365,9 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -1301,9 +1365,9 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
self._packlock.acquire() self._packlock.acquire()
try: try:
# We don't wrap this in _withtxn() because we're going to do the # We don't wrap this in _withtxn() because we're going to do the
# operation across several Berkeley transactions. It makes # operation across several Berkeley transactions, which allows
# bookkeeping harder, but it also allows other work to happen # other work to happen (stores and reads) while packing is being
# (stores and reads) while packing is being done. # done.
self._dopack(t) self._dopack(t)
finally: finally:
self._packlock.release() self._packlock.release()
...@@ -1321,39 +1385,193 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -1321,39 +1385,193 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
packtime = min(t, time.time()) packtime = min(t, time.time())
t0 = TimeStamp(*(time.gmtime(packtime)[:5] + (packtime % 60,))) t0 = TimeStamp(*(time.gmtime(packtime)[:5] + (packtime % 60,)))
packtid = `t0` packtid = `t0`
# Calculate the set of objects reachable from the root. Anything else # Collect all revisions of all objects earlier than the pack time.
# is a candidate for having all their revisions packed away. The set self._lock_acquire()
# of reachable objects lives in the _packmark table. try:
self._withtxn(self._collect_revs, packtid)
finally:
self._lock_release()
# Collect any objects with refcount zero. We do this before the mark
# and sweep because we're sharing the oidqueue table for two different
# purposes.
self._lock_acquire() self._lock_acquire()
try: try:
self._withtxn(self._mark) self._withtxn(self._collect_objs)
finally: finally:
self._lock_release() self._lock_release()
# Now cruise through all the transactions from the pack time forward, # Do a mark and sweep for garbage collection. Calculate the set of
# getting rid of any objects not reachable from the root, or any # objects reachable from the root. Anything else is a candidate for
# non-current revisions of reachable objects. # having all their revisions packed away. The set of reachable
# objects lives in the _packmark table.
self._lock_acquire() self._lock_acquire()
try: try:
self._withtxn(self._sweep, end=packtid) self._withtxn(self._mark, packtid)
finally: finally:
self._lock_release() self._lock_release()
# Now we have the zaptids table which contains a list of all the # Now perform a sweep, using oidqueue to hold all object ids for
# transactions tha can get packed away. So zap 'em. # objects which are not root reachable as of the pack time.
self._lock_acquire() self._lock_acquire()
try: try:
self._withtxn(self._collect) self._withtxn(self._sweep, packtid)
finally:
self._lock_release()
# Once again, collect any objects with refcount zero due to the mark
# and sweep garbage collection pass.
self._lock_acquire()
try:
self._withtxn(self._collect_objs)
finally: finally:
self._lock_release() self._lock_release()
def _mark(self, txn): def _collect_revs(self, txn, packtid):
# Find the oids for all the objects reachable from the root. To ct = co = None
# reduce the amount of in-core memory we need do do a pack operation, try:
# we'll save the mark data in the packmark table. The oidqueue is a co = self._objrevs.cursor(txn=txn)
# BerkeleyDB Queue that holds the list of object ids to look at next, ct = self._txnoids.cursor(txn=txn)
# and by using this we don't need to keep an in-memory dictionary. rec = co.first()
while rec:
revid, oldserial = rec
newserial = revid[:8]
oid = revid[8:]
if newserial > packtid:
break
# If the oldserial is ZERO, then this is the first revision of
# the object, and thus no old revision to pack away. We can
# delete this record from objrevs so we won't have to deal
# with it again. Otherwise, we can remove the metadata record
# for this revision and decref the corresponding pickle.
if oldserial <> ZERO:
metadata = self._metadata[oid+oldserial]
self._metadata.delete(oid+oldserial, txn=txn)
# Decref the pickle
self._decrefPickle(oid, metadata[16:24], txn)
# Remove the txnoids entry. We have to use a cursor here.
ct.set_both(oldserial, oid)
ct.delete()
co.delete()
rec = co.next()
finally:
if co: co.close()
if ct: ct.close()
# Note that before we commit this Berkeley transaction, we also need
# to update the packtime table, so we can't have the possibility of a
# race condition with undoLog().
self._packtime.put(packtid, PRESENT, txn=txn)
def _decrefPickle(self, oid, lrevid, txn):
if lrevid == DNE:
# There is no pickle data
return
key = oid + lrevid
refcount = U64(self._pickleRefcounts.get(key, ZERO)) - 1
if refcount <= 0:
# We can collect this pickle
self._pickleRefcounts.delete(key, txn=txn)
data = self._pickles[key]
self._pickles.delete(key, txn=txn)
deltas = {}
self._update(deltas, data, -1)
self._decref(deltas, txn)
else:
self._pickleRefcounts.put(p64(refcount), txn=txn)
def _decref(self, deltas, txn):
for oid, delta in deltas.items():
refcount = U64(self._refcounts.get(oid, ZERO)) + delta
if refcount > 0:
self._refcounts.put(oid, p64(refcount), txn=txn)
else:
# This object is no longer referenced by any other object in
# the system. We can collect all traces of it.
self._oidqueue.append(oid, txn)
def _collect_objs(self, txn):
orec = self._oidqueue.consume()
while orec:
oid = orec[1]
# Delete the object from the serials table
c = self._serials.cursor(txn)
try:
rec = c.set(oid)
while rec and rec[0] == oid:
c.delete()
rec = c.next_dup()
# We don't need the refcounts any more, but note that if the
# object was never referenced from another object, there may
# not be a refcounts entry.
try:
self._refcounts.delete(oid, txn=txn)
except db.DBNotFoundError:
pass
finally:
c.close()
# Collect all metadata records for this object
c = self._metadata.cursor(txn)
try:
rec = c.set_range(oid)
while rec and rec[0][:8] == oid:
revid, metadata = rec
tid = revid[8:]
c.delete()
rec = c.next()
self._decrefPickle(oid, metadata[16:24], txn)
# Delete the txnoid entry for this revision
ct = self._txnoids.cursor(txn=txn)
try:
ct.set_both(tid, oid)
ct.delete()
finally:
ct.close()
# Clean up version related tables
vid = metadata[:8]
if vid <> ZERO:
cv = self._currentVersions.cursor(txn=txn)
try:
cv.set_both(vid, revid)
cv.delete()
finally:
cv.close()
# BAW: maybe we want to refcount vids and versions table
# entries, but given the rarity of versions, this
# seems like too much work for too little gain.
finally:
c.close()
# We really do want this down here, since _decrefPickle() could
# add more items to the queue.
orec = self._oidqueue.consume()
assert len(self._oidqueue) == 0
def _findrev(self, oid, packtid, txn):
# BAW: Maybe this could probably be more efficient by not doing so
# much searching, but it would also be more complicated, so the
# tradeoff should be measured.
serial = None
c = self._metadata.cursor(txn=txn)
try:
rec = c.set_range(oid)
while rec:
revid, metadata = rec
coid = revid[:8]
ctid = revid[8:]
if coid <> oid or ctid > packtid:
# We found the end of the metadata records for this
# object prior to the pack time.
break
serial = ctid
rec = c.next()
finally:
c.close()
return serial
def _mark(self, txn, packtid):
# Find the oids for all the objects reachable from the root, as of the
# pack time. To reduce the amount of in-core memory we need do do a
# pack operation, we'll save the mark data in the packmark table. The
# oidqueue is a BerkeleyDB Queue that holds the list of object ids to
# look at next, and by using this we don't need to keep an in-memory
# dictionary.
assert len(self._packmark) == 0 assert len(self._packmark) == 0
assert len(self._oidqueue) == 0 assert len(self._oidqueue) == 0
assert len(self._zaptids) == 0
# Quick exit for empty storages # Quick exit for empty storages
if not self._serials: if not self._serials:
return return
...@@ -1367,11 +1585,12 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -1367,11 +1585,12 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
# We've already seen this object # We've already seen this object
continue continue
self._packmark.put(oid, PRESENT, txn=txn) self._packmark.put(oid, PRESENT, txn=txn)
# Get the pickle data for this object's current version # Get the pickle data for the most current revision of this object
serial, tid = self._getSerialAndTidMissingOk(oid) # as of the pack time.
tid = self._findrev(oid, packtid, txn)
# Say there's no root object (as is the case in some of the unit # Say there's no root object (as is the case in some of the unit
# tests), and we're looking up oid ZERO. Then serial will be None. # tests), and we're looking up oid ZERO. Then serial will be None.
if serial is not None: if tid is not None:
lrevid = self._metadata[oid+tid][16:24] lrevid = self._metadata[oid+tid][16:24]
data = self._pickles[oid+lrevid] data = self._pickles[oid+lrevid]
# Now get the oids of all the objects referenced by this pickle # Now get the oids of all the objects referenced by this pickle
...@@ -1385,189 +1604,29 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -1385,189 +1604,29 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
oid = rec and rec[1] oid = rec and rec[1]
assert len(self._oidqueue) == 0 assert len(self._oidqueue) == 0
def _sweep(self, txn, start=None, end=None): def _sweep(self, txn, packtid):
cm = self._txnMetadata.cursor(txn=txn) c = self._serials.cursor(txn=txn)
try: try:
# Cruise forward through transactions from the first to the pack rec = c.first()
# time looking for unpacked transactions that have no current while rec:
# records for their objects. oid = rec[0]
mrec = None rec = c.next()
if start is not None: serial, tid = self._getSerialAndTid(oid)
mrec = cm.set(start) # If the current revision of this object newer than the
if mrec is None: # packtid, we'll ignore this object since we only care about
mrec = cm.first() # root reachability as of the pack time.
while mrec: if tid > packtid:
tid, metadata = mrec
if tid > end:
break
mrec = cm.next()
if metadata[0] == PROTECTED_TRANSACTION:
# This one's already been packed so we can skip it
continue continue
zap = True # Otherwise, if packmark (which knows about all the root
ct = self._txnoids.cursor(txn=txn) # reachable objects) doesn't have a record for this guy, then
try: # we can zap it. Do so by appending to oidqueue.
rec = ct.set(tid) if not self._packmark.has_key(oid):
while rec: self._oidqueue.append(oid, txn)
ctid, coid = rec
rec = ct.next_dup()
if ctid <> tid:
break
serial, otid = self._getSerialAndTid(coid)
if serial == tid and self._packmark.has_key(coid):
# This transaction matches the current serial
# number for an object that is reachable from the
# root, so we can't pack this transaction.
zap = False
break
if zap:
self._zaptids.append(tid)
finally:
ct.close()
finally: finally:
cm.close() c.close()
# We're done with the mark table # We're done with the mark table
self._packmark.truncate(txn=txn) self._packmark.truncate(txn=txn)
def _collect(self, txn):
rec = self._zaptids.consume()
while rec:
tid = rec[1]
rec = self._zaptids.consume()
c = self._txnoids.cursor(txn)
try:
trec = c.set(tid)
while trec and trec[0] == tid:
oid = trec[1]
# We can get rid of this txnoids entry
c.delete()
trec = c.next_dup()
# Delete the metadata record
metadata = self._metadata[oid+tid]
self._metadata.delete(oid+tid, txn=txn)
# Decref the pickle
self._decrefPickle(oid, metadata[16:24], txn)
finally:
c.close()
# Set the status flag on the transaction metadata for this txn
txnmeta = self._txnMetadata[tid]
if txnmeta[0] <> PROTECTED_TRANSACTION:
txnmeta = PROTECTED_TRANSACTION + txnmeta[1:]
self._txnMetadata.put(tid, txnmeta, txn=txn)
def _decref(self, deltas, txn):
for oid, delta in deltas.items():
refcount = U64(self._refcounts.get(oid, ZERO)) + delta
if refcount > 0:
self._refcounts.put(oid, p64(refcount), txn=txn)
# This object is no longer referenced by any other object in the
# system. We can collect all traces of it.
c = self._serials.cursor(txn)
try:
rec = c.set(oid)
while rec and rec[0] == oid:
c.delete()
rec = c.next_dup()
finally:
c.close()
# Collect all metadata record that reference this object
c = self._metadata.cursor(txn)
try:
rec = c.set_range(oid)
while rec and rec[:8] == oid:
revid, metadata = rec
c.delete()
rec = c.next()
self._decrefPickle(oid, metadata[16:24], txn)
finally:
c.close()
def _decrefPickle(self, oid, lrevid, txn):
if lrevid == DNE:
# There is no pickle data
return
key = oid + lrevid
refcount = U64(self._pickleRefcounts.get(key, ZERO)) - 1
if refcount <= 0:
# We can collect this pickle
self._pickleRefcounts.delete(key, txn=txn)
data = self._pickles[key]
self._pickles.delete(key, txn=txn)
deltas = {}
self._update(deltas, data, -1)
self._decref(deltas, txn)
else:
self._pickleRefcounts.put(p64(refcount), txn=txn)
#
# GCable interface, for cyclic garbage collection (untested)
#
def gcTrash(oids):
"""Given a list of oids, treat them as trash.
This means they can be garbage collected, with all necessary cascading
reference counting performed
"""
# BAW: this is broken -- make it look like the end of pack()
self._lock_acquire()
c = None
try:
c = self._metadata.cursor()
for oid in oids:
# Convert to a string
oid = p64(oid)
# Delete all the metadata records
rec = c.set(oid)
while rec:
key, data = rec
rec = c.next_dup()
self._zaprevision(key)
finally:
if c:
c.close()
self._lock_release()
def gcRefcount(oid):
"""Return the reference count of the specified object.
Raises KeyError if there is no object with oid. Both the oid argument
and the returned reference count are integers.
"""
self._lock_acquire()
try:
return U64(self._refcounts[p64(oid)])
finally:
self._lock_release()
def gcReferences(oid):
"""Return a list of oids that the specified object refers to.
Raises KeyError if there is no object with oid. The oid argument
is an integer; the return value is a list of integers of oids.
"""
oids = []
c = None
self._lock_acquire()
try:
c = self._pickles.cursor()
rec = c.set(p64(oid))
while rec:
# We don't care about the key
pickle = rec[1]
rec = c.next_dup()
# Sniff the pickle for object references
tmpoids = []
referencesf(pickle, tmpoids)
# Convert to unsigned longs
oids.extend(map(U64, tmpoids))
# Make sure there's no duplicates, and convert to int
return oids
finally:
if c:
c.close()
self._lock_release()
# #
# Iterator protocol # Iterator protocol
# #
...@@ -1602,12 +1661,16 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -1602,12 +1661,16 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
tid, data = rec tid, data = rec
# Now unpack the necessary information. Don't impedence match the # Now unpack the necessary information. Don't impedence match the
# status flag (that's done by the caller). # status flag (that's done by the caller).
status = data[0] packtime = self._last_packtime()
userlen, desclen = unpack('>II', data[1:9]) if tid <= packtime:
user = data[9:9+userlen] packedp = True
desc = data[9+userlen:9+userlen+desclen] else:
ext = data[9+userlen+desclen:] packedp = False
return tid, status, user, desc, ext userlen, desclen = unpack('>II', data[:8])
user = data[8:8+userlen]
desc = data[8+userlen:8+userlen+desclen]
ext = data[8+userlen+desclen:]
return tid, packedp, user, desc, ext
finally: finally:
if c: if c:
c.close() c.close()
...@@ -1678,14 +1741,14 @@ class _TransactionsIterator(_GetItemBase): ...@@ -1678,14 +1741,14 @@ class _TransactionsIterator(_GetItemBase):
if self._closed: if self._closed:
raise IOError, 'iterator is closed' raise IOError, 'iterator is closed'
# Let IndexErrors percolate up. # Let IndexErrors percolate up.
tid, status, user, desc, ext = self._storage._nexttxn( tid, packedp, user, desc, ext = self._storage._nexttxn(
self._tid, self._first) self._tid, self._first)
self._first = False self._first = False
# Did we reach the specified end? # Did we reach the specified end?
if self._stop is not None and tid > self._stop: if self._stop is not None and tid > self._stop:
raise IndexError raise IndexError
self._tid = tid self._tid = tid
return _RecordsIterator(self._storage, tid, status, user, desc, ext) return _RecordsIterator(self._storage, tid, packedp, user, desc, ext)
def close(self): def close(self):
self._closed = True self._closed = True
...@@ -1715,14 +1778,14 @@ class _RecordsIterator(_GetItemBase): ...@@ -1715,14 +1778,14 @@ class _RecordsIterator(_GetItemBase):
description = None description = None
_extension = None _extension = None
def __init__(self, storage, tid, status, user, desc, ext): def __init__(self, storage, tid, packedp, user, desc, ext):
self._storage = storage self._storage = storage
self.tid = tid self.tid = tid
# Impedence matching # Impedence matching
if status == UNDOABLE_TRANSACTION: if packedp:
self.status = ' '
else:
self.status = 'p' self.status = 'p'
else:
self.status = ' '
self.user = user self.user = user
self.description = desc self.description = desc
self._extension = ext self._extension = ext
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
"""Berkeley storage with full undo and versioning support. """Berkeley storage with full undo and versioning support.
""" """
__version__ = '$Revision: 1.46 $'.split()[-2:][0] __version__ = '$Revision: 1.47 $'.split()[-2:][0]
import sys import sys
import time import time
...@@ -24,7 +24,7 @@ from struct import pack, unpack ...@@ -24,7 +24,7 @@ from struct import pack, unpack
# This uses the Dunn/Kuchling PyBSDDB v3 extension module available from # This uses the Dunn/Kuchling PyBSDDB v3 extension module available from
# http://pybsddb.sourceforge.net. It is compatible with release 3.4 of # http://pybsddb.sourceforge.net. It is compatible with release 3.4 of
# PyBSDDB3. # PyBSDDB3. The only recommended version of BerkeleyDB is 4.0.14.
from bsddb3 import db from bsddb3 import db
from ZODB import POSException from ZODB import POSException
...@@ -41,21 +41,15 @@ import ThreadLock ...@@ -41,21 +41,15 @@ import ThreadLock
# functionality. # functionality.
from BerkeleyBase import BerkeleyBase from BerkeleyBase import BerkeleyBase
# Flags for transaction status in the transaction metadata table. You can
# only undo back to the last pack, and any transactions before the pack time
# get marked with the PROTECTED_TRANSACTION flag. An attempt to undo past a
# PROTECTED_TRANSACTION will raise an POSException.UndoError. By default,
# transactions are marked with the UNDOABLE_TRANSACTION status flag.
UNDOABLE_TRANSACTION = 'Y'
PROTECTED_TRANSACTION = 'N'
ABORT = 'A' ABORT = 'A'
COMMIT = 'C' COMMIT = 'C'
PRESENT = 'X' PRESENT = 'X'
ZERO = '\0'*8 ZERO = '\0'*8
# Special flag for uncreated objects (i.e. Does Not Exist)
DNE = '\377'*8 DNE = '\377'*8
# DEBUGGING # DEBUGGING
#DNE = 'nonexist' # does not exist #DNE = 'nonexist'
try: try:
# Python 2.2 # Python 2.2
...@@ -91,7 +85,8 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -91,7 +85,8 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
# #
# - Object ids (oid) are 8-bytes # - Object ids (oid) are 8-bytes
# - Objects have revisions, with each revision being identified by a # - Objects have revisions, with each revision being identified by a
# unique serial number. # unique serial number. We sometimes refer to 16-byte strings of
# oid+serial as a revision id.
# - Transaction ids (tid) are 8-bytes # - Transaction ids (tid) are 8-bytes
# - Version ids (vid) are 8-bytes # - Version ids (vid) are 8-bytes
# - Data pickles are of arbitrary length # - Data pickles are of arbitrary length
...@@ -138,16 +133,9 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -138,16 +133,9 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
# prevrevid is the tid pointing to the previous state of the # prevrevid is the tid pointing to the previous state of the
# object. This is used for undo. # object. This is used for undo.
# #
# txnMetadata -- {tid -> status+userlen+desclen+user+desc+ext} # txnMetadata -- {tid -> userlen+desclen+user+desc+ext}
# Maps tids to metadata about a transaction. # Maps tids to metadata about a transaction.
# #
# Status is a 1-character status flag, which is used by the undo
# mechanism, and has the following values (see constants above):
# 'N' -- This transaction is "pack protected". You can only
# undo back to the last pack, and any transactions
# before the pack time get marked with this flag.
# 'Y' -- It is okay to undo past this transaction.
#
# userlen is the length in characters of the `user' field as an # userlen is the length in characters of the `user' field as an
# 8-byte unsigned long integer # 8-byte unsigned long integer
# desclen is the length in characters of the `desc' field as an # desclen is the length in characters of the `desc' field as an
...@@ -210,8 +198,21 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -210,8 +198,21 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
# recovery/restart, all pending data should be committed. Outside # recovery/restart, all pending data should be committed. Outside
# of any transaction (e.g. before the tpc_begin()), there will be # of any transaction (e.g. before the tpc_begin()), there will be
# no pending entry. It is a database invariant that if the # no pending entry. It is a database invariant that if the
# pending table is empty, the oids and pvids tables must also be # pending table is empty, the oids, pvids, and prevrevids tables
# empty. # must also be empty.
#
# packtime -- tid
# The time of the last pack. It is illegal to undo to before the
# last pack time.
#
# objrevs -- {newserial+oid -> oldserial}
# This table collects object revision information for packing
# purposes. Every time a new object revision is committed, we
# write an entry to this table. When we run pack, we iterate from
# the start of this table until newserial > packtime, deleting old
# revisions of objects. Note that when a new revision of an
# object is first written to a version, no entry is written here.
# We do write an entry when we commit or abort the version.
# #
# packmark -- [oid] # packmark -- [oid]
# Every object reachable from the root during a classic pack # Every object reachable from the root during a classic pack
...@@ -220,10 +221,9 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -220,10 +221,9 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
# oidqueue -- [oid] # oidqueue -- [oid]
# This table is a Queue, not a BTree. It is used during the mark # This table is a Queue, not a BTree. It is used during the mark
# phase of pack() and contains a list of oids for work to be done. # phase of pack() and contains a list of oids for work to be done.
# # It is also used during pack to list objects for which no more
# zaptids -- [tid] # references exist, such that the objects can be completely packed
# This is another queue written during the sweep phase to collect # away.
# transaction ids that can be packed away.
# #
self._serials = self._setupDB('serials', db.DB_DUP) self._serials = self._setupDB('serials', db.DB_DUP)
self._pickles = self._setupDB('pickles') self._pickles = self._setupDB('pickles')
...@@ -241,22 +241,20 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -241,22 +241,20 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
self._txnMetadata = self._setupDB('txnMetadata') self._txnMetadata = self._setupDB('txnMetadata')
self._txnoids = self._setupDB('txnoids', db.DB_DUP) self._txnoids = self._setupDB('txnoids', db.DB_DUP)
self._pickleRefcounts = self._setupDB('pickleRefcounts') self._pickleRefcounts = self._setupDB('pickleRefcounts')
# Table to support packing. # Tables to support packing.
self._objrevs = self._setupDB('objrevs', db.DB_DUP)
self._packmark = self._setupDB('packmark') self._packmark = self._setupDB('packmark')
self._packtime = self._setupDB('packtime')
self._oidqueue = db.DB(self._env) self._oidqueue = db.DB(self._env)
self._oidqueue.set_re_len(8) self._oidqueue.set_re_len(8)
# BAW: do we need to set the queue extent size? # BAW: do we need to set the queue extent size?
self._oidqueue.open(self._prefix + 'oidqueue', self._oidqueue.open(self._prefix + 'oidqueue',
db.DB_QUEUE, db.DB_CREATE) db.DB_QUEUE, db.DB_CREATE)
self._zaptids = db.DB(self._env) # Do recovery and consistency checks
self._zaptids.set_re_len(8) self._withlock(self._dorecovery)
self._zaptids.open(self._prefix + 'zaptids',
db.DB_QUEUE, db.DB_CREATE)
# DEBUGGING # DEBUGGING
#self._nextserial = 0L #self._nextserial = 0L
# END DEBUGGING # END DEBUGGING
# Do recovery and consistency checks
self._withlock(self._dorecovery)
def _dorecovery(self): def _dorecovery(self):
# If these tables are non-empty, it means we crashed during a pack # If these tables are non-empty, it means we crashed during a pack
...@@ -264,7 +262,6 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -264,7 +262,6 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
# pack operation will reproduce it faithfully. # pack operation will reproduce it faithfully.
self._oidqueue.truncate() self._oidqueue.truncate()
self._packmark.truncate() self._packmark.truncate()
self._zaptids.truncate()
# The pendings table may have entries if we crashed before we could # The pendings table may have entries if we crashed before we could
# abort or commit the outstanding ZODB transaction. # abort or commit the outstanding ZODB transaction.
pendings = self._pending.keys() pendings = self._pending.keys()
...@@ -272,6 +269,7 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -272,6 +269,7 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
if len(pendings) == 0: if len(pendings) == 0:
assert len(self._oids) == 0 assert len(self._oids) == 0
assert len(self._pvids) == 0 assert len(self._pvids) == 0
assert len(self._prevrevids) == 0
else: else:
# Do recovery # Do recovery
tid = pendings[0] tid = pendings[0]
...@@ -306,9 +304,10 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -306,9 +304,10 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
self._txnMetadata.close() self._txnMetadata.close()
self._txnoids.close() self._txnoids.close()
self._pickleRefcounts.close() self._pickleRefcounts.close()
self._objrevs.close()
self._packtime.close()
self._packmark.close() self._packmark.close()
self._oidqueue.close() self._oidqueue.close()
self._zaptids.close()
BerkeleyBase.close(self) BerkeleyBase.close(self)
def _withtxn(self, meth, *args, **kws): def _withtxn(self, meth, *args, **kws):
...@@ -333,6 +332,7 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -333,6 +332,7 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
cs = self._serials.cursor(txn=txn) cs = self._serials.cursor(txn=txn)
ct = self._txnoids.cursor(txn=txn) ct = self._txnoids.cursor(txn=txn)
cv = self._currentVersions.cursor(txn=txn) cv = self._currentVersions.cursor(txn=txn)
cr = self._objrevs.cursor(txn=txn)
rec = co.first() rec = co.first()
while rec: while rec:
oid = rec[0] oid = rec[0]
...@@ -354,6 +354,13 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -354,6 +354,13 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
vid = self._metadata[revid][:8] vid = self._metadata[revid][:8]
self._metadata.delete(revid, txn=txn) self._metadata.delete(revid, txn=txn)
self._pickles.delete(revid, txn=txn) self._pickles.delete(revid, txn=txn)
# Clean up the object revisions table
try:
cr.set(oid+tid)
except db.DBNotFoundError:
pass
else:
cr.delete()
# Now we have to clean up the currentVersions table # Now we have to clean up the currentVersions table
try: try:
cv.set_both(vid, revid) cv.set_both(vid, revid)
...@@ -368,6 +375,7 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -368,6 +375,7 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
if co: co.close() if co: co.close()
if cs: cs.close() if cs: cs.close()
if cv: cv.close() if cv: cv.close()
if cr: cr.close()
# Now clean up the vids and versions tables # Now clean up the vids and versions tables
cv = self._pvids.cursor(txn=txn) cv = self._pvids.cursor(txn=txn)
try: try:
...@@ -393,6 +401,7 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -393,6 +401,7 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
# Nothing to abort # Nothing to abort
assert len(self._oids) == 0 assert len(self._oids) == 0
assert len(self._pvids) == 0 assert len(self._pvids) == 0
assert len(self._prevrevids) == 0
return return
assert len(pendings) == 1 assert len(pendings) == 1
tid = pendings[0] tid = pendings[0]
...@@ -468,7 +477,7 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -468,7 +477,7 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
userlen = len(u) userlen = len(u)
desclen = len(d) desclen = len(d)
lengths = pack('>II', userlen, desclen) lengths = pack('>II', userlen, desclen)
data = UNDOABLE_TRANSACTION + lengths + u + d + e data = lengths + u + d + e
self._pending.put(tid, ABORT, txn=txn) self._pending.put(tid, ABORT, txn=txn)
self._txnMetadata.put(tid, data, txn=txn) self._txnMetadata.put(tid, data, txn=txn)
...@@ -489,7 +498,7 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -489,7 +498,7 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
def _dostore(self, txn, oid, serial, data, version): def _dostore(self, txn, oid, serial, data, version):
conflictresolved = False conflictresolved = False
vid = nvrevid = ZERO vid = nvrevid = ovid = ZERO
# Check for conflict errors. JF says: under some circumstances, # Check for conflict errors. JF says: under some circumstances,
# it is possible that we'll get two stores for the same object in # it is possible that we'll get two stores for the same object in
# a single transaction. It's not clear though under what # a single transaction. It's not clear though under what
...@@ -547,12 +556,18 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -547,12 +556,18 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
self._pickles.put(revid, data, txn=txn) self._pickles.put(revid, data, txn=txn)
self._metadata.put(revid, vid+nvrevid+newserial+oserial, txn=txn) self._metadata.put(revid, vid+nvrevid+newserial+oserial, txn=txn)
self._txnoids.put(newserial, oid, txn=txn) self._txnoids.put(newserial, oid, txn=txn)
# Update the object revisions table, but only if this store isn't
# the first one of this object in a new version.
if not version or ovid <> ZERO:
self._objrevs.put(newserial+oid, oserial, txn=txn)
# Update the log tables # Update the log tables
self._oids.put(oid, PRESENT, txn=txn) self._oids.put(oid, PRESENT, txn=txn)
if vid <> ZERO: if vid <> ZERO:
self._currentVersions.put(vid, revid, txn=txn) self._currentVersions.put(vid, revid, txn=txn)
self._pvids.put(vid, PRESENT, txn=txn) self._pvids.put(vid, PRESENT, txn=txn)
# And return the new serial number # And return the new serial number
if conflictresolved:
return ResolvedSerial
return newserial return newserial
def store(self, oid, serial, data, version, transaction): def store(self, oid, serial, data, version, transaction):
...@@ -567,7 +582,7 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -567,7 +582,7 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
def _dorestore(self, txn, oid, serial, data, version, prev_txn): def _dorestore(self, txn, oid, serial, data, version, prev_txn):
tid = self._serial tid = self._serial
vid = nvrevid = ZERO vid = nvrevid = ovid = ZERO
prevrevid = prev_txn prevrevid = prev_txn
# self._serial contains the transaction id as set by # self._serial contains the transaction id as set by
# BaseStorage.tpc_begin(). # BaseStorage.tpc_begin().
...@@ -622,6 +637,10 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -622,6 +637,10 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
self._oids.put(oid, PRESENT, txn=txn) self._oids.put(oid, PRESENT, txn=txn)
if vid <> ZERO: if vid <> ZERO:
self._currentVersions.put(vid, revid, txn=txn) self._currentVersions.put(vid, revid, txn=txn)
# Update the object revisions table, but only if this store isn't
# the first one of this object in a new version.
if not version or ovid <> ZERO:
self._objrevs.put(tid+oid, prevrevid, txn=txn)
def restore(self, oid, serial, data, version, prev_txn, transaction): def restore(self, oid, serial, data, version, prev_txn, transaction):
# A lot like store() but without all the consistency checks. This # A lot like store() but without all the consistency checks. This
...@@ -719,11 +738,19 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -719,11 +738,19 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
# while the transaction id is the current transaction. This # while the transaction id is the current transaction. This
# is the one case where serial <> tid, and a special record # is the one case where serial <> tid, and a special record
# must be written to the serials table for this. # must be written to the serials table for this.
self._serials.put(oid, nvrevid+self._serial, txn=txn) newserial = self._serial
self._metadata.put(oid+self._serial, ZERO+ZERO+lrevid+tid, self._serials.put(oid, nvrevid+newserial, txn=txn)
self._metadata.put(oid+newserial, ZERO+ZERO+lrevid+tid,
txn=txn) txn=txn)
self._txnoids.put(self._serial, oid, txn=txn) self._txnoids.put(newserial, oid, txn=txn)
self._oids.put(oid, PRESENT, txn=txn) self._oids.put(oid, PRESENT, txn=txn)
# Now we need to write two records to the object revisions
# table. First, it's the record containing the previous
# serial number, and then it's a record containing the
# non-version serial number (but make sure the object wasn't
# created in the version).
self._objrevs.put(newserial+oid, tid, txn=txn)
self._objrevs.put(newserial+oid, nvrevid, txn=txn)
c.delete() c.delete()
rec = c.next() rec = c.next()
# XXX Should we garbage collect vids and versions? Doing so might # XXX Should we garbage collect vids and versions? Doing so might
...@@ -792,11 +819,20 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -792,11 +819,20 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
# source version. # source version.
if not dest: if not dest:
nvrevid = ZERO nvrevid = ZERO
self._serials.put(oid, self._serial, txn=txn) newserial = self._serial
self._metadata.put(oid+self._serial, dvid+nvrevid+lrevid+tid, self._serials.put(oid, newserial, txn=txn)
self._metadata.put(oid+newserial, dvid+nvrevid+lrevid+tid,
txn=txn) txn=txn)
self._txnoids.put(self._serial, oid, txn=txn) self._txnoids.put(newserial, oid, txn=txn)
self._oids.put(oid, PRESENT, txn=txn) self._oids.put(oid, PRESENT, txn=txn)
# Now we need to write two records to the object revisions
# table. First, it's the record containing the previous
# serial number, and then it's a record containing the
# non-version serial number. However, if we're committing to
# a different version, don't write the second record.
self._objrevs.put(newserial+oid, tid, txn=txn)
if not dest:
self._objrevs.put(newserial+oid, nvrevid, txn=txn)
c.delete() c.delete()
rec = c.next() rec = c.next()
return rtnoids.keys() return rtnoids.keys()
...@@ -1017,6 +1053,15 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -1017,6 +1053,15 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
finally: finally:
self._lock_release() self._lock_release()
def _last_packtime(self):
packtimes = self._packtime.keys()
if len(packtimes) == 1:
return packtimes[0]
elif len(packtimes) == 0:
return ZERO
else:
assert False, 'too many packtimes'
# #
# Transactional undo # Transactional undo
# #
...@@ -1030,7 +1075,7 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -1030,7 +1075,7 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
# previous revision of this object. # previous revision of this object.
mdc = self._metadata.cursor() mdc = self._metadata.cursor()
try: try:
trec = mdc.set(oid+ctid) mdc.set(oid+ctid)
mrec = mdc.prev() mrec = mdc.prev()
if not mrec or mrec[0][:8] <> oid: if not mrec or mrec[0][:8] <> oid:
# The previous transaction metadata record doesn't point to # The previous transaction metadata record doesn't point to
...@@ -1091,12 +1136,11 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -1091,12 +1136,11 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
def _dotxnundo(self, txn, tid): def _dotxnundo(self, txn, tid):
# First, make sure the transaction isn't protected by a pack. # First, make sure the transaction isn't protected by a pack.
status = self._txnMetadata[tid][0] packtime = self._last_packtime()
if status <> UNDOABLE_TRANSACTION: if tid <= packtime:
raise POSException.UndoError, 'Transaction cannot be undone' raise POSException.UndoError, 'Transaction cannot be undone'
# Calculate all the oids of objects modified in this transaction # Calculate all the oids of objects modified in this transaction
newrevs = [] newrevs = []
newstates = []
c = self._txnoids.cursor(txn=txn) c = self._txnoids.cursor(txn=txn)
try: try:
rec = c.set(tid) rec = c.set(tid)
...@@ -1121,6 +1165,7 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -1121,6 +1165,7 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
# new metadata records (and potentially new pickle records). # new metadata records (and potentially new pickle records).
rtnoids = {} rtnoids = {}
for oid, metadata, data in newrevs: for oid, metadata, data in newrevs:
newserial = self._serial
revid = oid + self._serial revid = oid + self._serial
# If the data pickle is None, then this undo is simply # If the data pickle is None, then this undo is simply
# re-using a pickle stored earlier. All we need to do then is # re-using a pickle stored earlier. All we need to do then is
...@@ -1130,23 +1175,25 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -1130,23 +1175,25 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
vid, nvrevid, ign, prevrevid = unpack('>8s8s8s8s', metadata) vid, nvrevid, ign, prevrevid = unpack('>8s8s8s8s', metadata)
if data is not None: if data is not None:
self._pickles.put(revid, data, txn=txn) self._pickles.put(revid, data, txn=txn)
metadata = vid+nvrevid+self._serial+prevrevid metadata = vid+nvrevid+newserial+prevrevid
# We need to write all the new records for an object changing in # We need to write all the new records for an object changing in
# this transaction. Note that we only write to th serials table # this transaction. Note that we only write to th serials table
# if prevrevids hasn't already seen this object, otherwise we'll # if prevrevids hasn't already seen this object, otherwise we'll
# end up with multiple entries in the serials table for the same # end up with multiple entries in the serials table for the same
# tid. # tid.
if not self._prevrevids.has_key(oid): if not self._prevrevids.has_key(oid):
self._serials.put(oid, self._serial, txn=txn) self._serials.put(oid, newserial, txn=txn)
self._metadata.put(revid, metadata, txn=txn) self._metadata.put(revid, metadata, txn=txn)
# Only add this oid to txnoids once # Only add this oid to txnoids once
if not rtnoids.has_key(oid): if not rtnoids.has_key(oid):
self._prevrevids.put(oid, prevrevid, txn=txn) self._prevrevids.put(oid, prevrevid, txn=txn)
self._txnoids.put(self._serial, oid, txn=txn) self._txnoids.put(newserial, oid, txn=txn)
if vid <> ZERO: if vid <> ZERO:
self._currentVersions.put(oid, vid, txn=txn) self._currentVersions.put(oid, vid, txn=txn)
self._oids.put(oid, PRESENT, txn=txn) self._oids.put(oid, PRESENT, txn=txn)
rtnoids[oid] = 1 rtnoids[oid] = 1
# Add this object revision to the autopack table
self._objrevs.put(newserial+oid, prevrevid, txn=txn)
return rtnoids.keys() return rtnoids.keys()
def transactionalUndo(self, tid, transaction): def transactionalUndo(self, tid, transaction):
...@@ -1159,6 +1206,8 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -1159,6 +1206,8 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
self._lock_release() self._lock_release()
def _doundolog(self, first, last, filter): def _doundolog(self, first, last, filter):
# Get the last packtime
packtime = self._last_packtime()
i = 0 # first <= i < last i = 0 # first <= i < last
txnDescriptions = [] # the return value txnDescriptions = [] # the return value
c = self._txnMetadata.cursor() c = self._txnMetadata.cursor()
...@@ -1171,13 +1220,12 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -1171,13 +1220,12 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
while rec and i < last: while rec and i < last:
tid, txnmeta = rec tid, txnmeta = rec
rec = c.prev() rec = c.prev()
status = txnmeta[0] if tid <= packtime:
if status == PROTECTED_TRANSACTION:
break break
userlen, desclen = unpack('>II', txnmeta[1:9]) userlen, desclen = unpack('>II', txnmeta[:8])
user = txnmeta[9:9+userlen] user = txnmeta[8:8+userlen]
desc = txnmeta[9+userlen:9+userlen+desclen] desc = txnmeta[8+userlen:8+userlen+desclen]
ext = txnmeta[9+userlen+desclen:] ext = txnmeta[8+userlen+desclen:]
# Create a dictionary for the TransactionDescription # Create a dictionary for the TransactionDescription
txndesc = {'id' : tid, txndesc = {'id' : tid,
'time' : TimeStamp(tid).timeTime(), 'time' : TimeStamp(tid).timeTime(),
...@@ -1244,9 +1292,9 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -1244,9 +1292,9 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
# ...while other information comes out of the transaction # ...while other information comes out of the transaction
# metadata. # metadata.
txnmeta = self._txnMetadata[tid] txnmeta = self._txnMetadata[tid]
userlen, desclen = unpack('>II', txnmeta[1:9]) userlen, desclen = unpack('>II', txnmeta[:8])
user = txnmeta[9:9+userlen] user = txnmeta[8:8+userlen]
desc = txnmeta[9+userlen:9+userlen+desclen] desc = txnmeta[8+userlen:8+userlen+desclen]
# Now get the pickle size # Now get the pickle size
data = self._pickles[oid+lrevid] data = self._pickles[oid+lrevid]
# Create a HistoryEntry structure, which turns out to be a # Create a HistoryEntry structure, which turns out to be a
...@@ -1276,19 +1324,35 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -1276,19 +1324,35 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
self._lock_release() self._lock_release()
# #
# Packing. # Packing
#
# There are two types of pack operations, the classic pack and the
# autopack. Autopack's sole job is to periodically delete non-current
# object revisions. It runs in a thread and has an `autopack time' which
# is essentially just a time in the past at which to autopack to. For
# example, you might set up autopack to run once per hour, packing away
# all revisions that are older than 4 hours. Autopack can also be
# configured to periodically do a classic pack.
# #
# There are two types of pack operations, the classic pack and autopack. # Classic pack is like autopack -- it packs away old revisions -- but it
# Classic pack is the full blown mark and sweep operation, removing all # also does a mark and sweep through all the known objects, looking for
# revisions of all objects not reachable from the root. This can take a # those that are not root reachable as of the pack time. Such objects are
# long time, although the implementation attempts to mitigate both in-core # also packed away even if they have current revisions in the packable
# memory usage and blocking other, non-packing operations. # transactions, because it means that there is no undo operation that can
# restore the object's reachability. Remember that you cannot undo
# previous to the latest pack time.
# #
# Autopack is a more lightweight operation. It only removes non-current # Both packing strategies do reference counting, and the refcounts are
# revisions in a window of transactions, and doesn't do a root # sums of the refcounts of all revisions, so if an object's refcount goes
# reachability test. # to zero, all its object revisions can safely be packed away.
#
# We try to interleave BerkeleyDB transactions and non-pack-lock
# acquisition as granularly as possible so that packing doesn't block
# other operations for too long. But remember we don't use Berkeley locks
# so we have to be careful about our application level locks.
# #
# First, the public API for classic pack
def pack(self, t, zreferencesf): def pack(self, t, zreferencesf):
# For all intents and purposes, referencesf here is always going to be # For all intents and purposes, referencesf here is always going to be
# the same as ZODB.referencesf.referencesf. It's too much of a PITA # the same as ZODB.referencesf.referencesf. It's too much of a PITA
...@@ -1301,9 +1365,9 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -1301,9 +1365,9 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
self._packlock.acquire() self._packlock.acquire()
try: try:
# We don't wrap this in _withtxn() because we're going to do the # We don't wrap this in _withtxn() because we're going to do the
# operation across several Berkeley transactions. It makes # operation across several Berkeley transactions, which allows
# bookkeeping harder, but it also allows other work to happen # other work to happen (stores and reads) while packing is being
# (stores and reads) while packing is being done. # done.
self._dopack(t) self._dopack(t)
finally: finally:
self._packlock.release() self._packlock.release()
...@@ -1321,39 +1385,193 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -1321,39 +1385,193 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
packtime = min(t, time.time()) packtime = min(t, time.time())
t0 = TimeStamp(*(time.gmtime(packtime)[:5] + (packtime % 60,))) t0 = TimeStamp(*(time.gmtime(packtime)[:5] + (packtime % 60,)))
packtid = `t0` packtid = `t0`
# Calculate the set of objects reachable from the root. Anything else # Collect all revisions of all objects earlier than the pack time.
# is a candidate for having all their revisions packed away. The set self._lock_acquire()
# of reachable objects lives in the _packmark table. try:
self._withtxn(self._collect_revs, packtid)
finally:
self._lock_release()
# Collect any objects with refcount zero. We do this before the mark
# and sweep because we're sharing the oidqueue table for two different
# purposes.
self._lock_acquire() self._lock_acquire()
try: try:
self._withtxn(self._mark) self._withtxn(self._collect_objs)
finally: finally:
self._lock_release() self._lock_release()
# Now cruise through all the transactions from the pack time forward, # Do a mark and sweep for garbage collection. Calculate the set of
# getting rid of any objects not reachable from the root, or any # objects reachable from the root. Anything else is a candidate for
# non-current revisions of reachable objects. # having all their revisions packed away. The set of reachable
# objects lives in the _packmark table.
self._lock_acquire() self._lock_acquire()
try: try:
self._withtxn(self._sweep, end=packtid) self._withtxn(self._mark, packtid)
finally: finally:
self._lock_release() self._lock_release()
# Now we have the zaptids table which contains a list of all the # Now perform a sweep, using oidqueue to hold all object ids for
# transactions tha can get packed away. So zap 'em. # objects which are not root reachable as of the pack time.
self._lock_acquire() self._lock_acquire()
try: try:
self._withtxn(self._collect) self._withtxn(self._sweep, packtid)
finally:
self._lock_release()
# Once again, collect any objects with refcount zero due to the mark
# and sweep garbage collection pass.
self._lock_acquire()
try:
self._withtxn(self._collect_objs)
finally: finally:
self._lock_release() self._lock_release()
def _mark(self, txn): def _collect_revs(self, txn, packtid):
# Find the oids for all the objects reachable from the root. To ct = co = None
# reduce the amount of in-core memory we need do do a pack operation, try:
# we'll save the mark data in the packmark table. The oidqueue is a co = self._objrevs.cursor(txn=txn)
# BerkeleyDB Queue that holds the list of object ids to look at next, ct = self._txnoids.cursor(txn=txn)
# and by using this we don't need to keep an in-memory dictionary. rec = co.first()
while rec:
revid, oldserial = rec
newserial = revid[:8]
oid = revid[8:]
if newserial > packtid:
break
# If the oldserial is ZERO, then this is the first revision of
# the object, and thus no old revision to pack away. We can
# delete this record from objrevs so we won't have to deal
# with it again. Otherwise, we can remove the metadata record
# for this revision and decref the corresponding pickle.
if oldserial <> ZERO:
metadata = self._metadata[oid+oldserial]
self._metadata.delete(oid+oldserial, txn=txn)
# Decref the pickle
self._decrefPickle(oid, metadata[16:24], txn)
# Remove the txnoids entry. We have to use a cursor here.
ct.set_both(oldserial, oid)
ct.delete()
co.delete()
rec = co.next()
finally:
if co: co.close()
if ct: ct.close()
# Note that before we commit this Berkeley transaction, we also need
# to update the packtime table, so we can't have the possibility of a
# race condition with undoLog().
self._packtime.put(packtid, PRESENT, txn=txn)
def _decrefPickle(self, oid, lrevid, txn):
if lrevid == DNE:
# There is no pickle data
return
key = oid + lrevid
refcount = U64(self._pickleRefcounts.get(key, ZERO)) - 1
if refcount <= 0:
# We can collect this pickle
self._pickleRefcounts.delete(key, txn=txn)
data = self._pickles[key]
self._pickles.delete(key, txn=txn)
deltas = {}
self._update(deltas, data, -1)
self._decref(deltas, txn)
else:
self._pickleRefcounts.put(p64(refcount), txn=txn)
def _decref(self, deltas, txn):
for oid, delta in deltas.items():
refcount = U64(self._refcounts.get(oid, ZERO)) + delta
if refcount > 0:
self._refcounts.put(oid, p64(refcount), txn=txn)
else:
# This object is no longer referenced by any other object in
# the system. We can collect all traces of it.
self._oidqueue.append(oid, txn)
def _collect_objs(self, txn):
orec = self._oidqueue.consume()
while orec:
oid = orec[1]
# Delete the object from the serials table
c = self._serials.cursor(txn)
try:
rec = c.set(oid)
while rec and rec[0] == oid:
c.delete()
rec = c.next_dup()
# We don't need the refcounts any more, but note that if the
# object was never referenced from another object, there may
# not be a refcounts entry.
try:
self._refcounts.delete(oid, txn=txn)
except db.DBNotFoundError:
pass
finally:
c.close()
# Collect all metadata records for this object
c = self._metadata.cursor(txn)
try:
rec = c.set_range(oid)
while rec and rec[0][:8] == oid:
revid, metadata = rec
tid = revid[8:]
c.delete()
rec = c.next()
self._decrefPickle(oid, metadata[16:24], txn)
# Delete the txnoid entry for this revision
ct = self._txnoids.cursor(txn=txn)
try:
ct.set_both(tid, oid)
ct.delete()
finally:
ct.close()
# Clean up version related tables
vid = metadata[:8]
if vid <> ZERO:
cv = self._currentVersions.cursor(txn=txn)
try:
cv.set_both(vid, revid)
cv.delete()
finally:
cv.close()
# BAW: maybe we want to refcount vids and versions table
# entries, but given the rarity of versions, this
# seems like too much work for too little gain.
finally:
c.close()
# We really do want this down here, since _decrefPickle() could
# add more items to the queue.
orec = self._oidqueue.consume()
assert len(self._oidqueue) == 0
def _findrev(self, oid, packtid, txn):
# BAW: Maybe this could probably be more efficient by not doing so
# much searching, but it would also be more complicated, so the
# tradeoff should be measured.
serial = None
c = self._metadata.cursor(txn=txn)
try:
rec = c.set_range(oid)
while rec:
revid, metadata = rec
coid = revid[:8]
ctid = revid[8:]
if coid <> oid or ctid > packtid:
# We found the end of the metadata records for this
# object prior to the pack time.
break
serial = ctid
rec = c.next()
finally:
c.close()
return serial
def _mark(self, txn, packtid):
# Find the oids for all the objects reachable from the root, as of the
# pack time. To reduce the amount of in-core memory we need do do a
# pack operation, we'll save the mark data in the packmark table. The
# oidqueue is a BerkeleyDB Queue that holds the list of object ids to
# look at next, and by using this we don't need to keep an in-memory
# dictionary.
assert len(self._packmark) == 0 assert len(self._packmark) == 0
assert len(self._oidqueue) == 0 assert len(self._oidqueue) == 0
assert len(self._zaptids) == 0
# Quick exit for empty storages # Quick exit for empty storages
if not self._serials: if not self._serials:
return return
...@@ -1367,11 +1585,12 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -1367,11 +1585,12 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
# We've already seen this object # We've already seen this object
continue continue
self._packmark.put(oid, PRESENT, txn=txn) self._packmark.put(oid, PRESENT, txn=txn)
# Get the pickle data for this object's current version # Get the pickle data for the most current revision of this object
serial, tid = self._getSerialAndTidMissingOk(oid) # as of the pack time.
tid = self._findrev(oid, packtid, txn)
# Say there's no root object (as is the case in some of the unit # Say there's no root object (as is the case in some of the unit
# tests), and we're looking up oid ZERO. Then serial will be None. # tests), and we're looking up oid ZERO. Then serial will be None.
if serial is not None: if tid is not None:
lrevid = self._metadata[oid+tid][16:24] lrevid = self._metadata[oid+tid][16:24]
data = self._pickles[oid+lrevid] data = self._pickles[oid+lrevid]
# Now get the oids of all the objects referenced by this pickle # Now get the oids of all the objects referenced by this pickle
...@@ -1385,189 +1604,29 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -1385,189 +1604,29 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
oid = rec and rec[1] oid = rec and rec[1]
assert len(self._oidqueue) == 0 assert len(self._oidqueue) == 0
def _sweep(self, txn, start=None, end=None): def _sweep(self, txn, packtid):
cm = self._txnMetadata.cursor(txn=txn) c = self._serials.cursor(txn=txn)
try: try:
# Cruise forward through transactions from the first to the pack rec = c.first()
# time looking for unpacked transactions that have no current while rec:
# records for their objects. oid = rec[0]
mrec = None rec = c.next()
if start is not None: serial, tid = self._getSerialAndTid(oid)
mrec = cm.set(start) # If the current revision of this object newer than the
if mrec is None: # packtid, we'll ignore this object since we only care about
mrec = cm.first() # root reachability as of the pack time.
while mrec: if tid > packtid:
tid, metadata = mrec
if tid > end:
break
mrec = cm.next()
if metadata[0] == PROTECTED_TRANSACTION:
# This one's already been packed so we can skip it
continue continue
zap = True # Otherwise, if packmark (which knows about all the root
ct = self._txnoids.cursor(txn=txn) # reachable objects) doesn't have a record for this guy, then
try: # we can zap it. Do so by appending to oidqueue.
rec = ct.set(tid) if not self._packmark.has_key(oid):
while rec: self._oidqueue.append(oid, txn)
ctid, coid = rec
rec = ct.next_dup()
if ctid <> tid:
break
serial, otid = self._getSerialAndTid(coid)
if serial == tid and self._packmark.has_key(coid):
# This transaction matches the current serial
# number for an object that is reachable from the
# root, so we can't pack this transaction.
zap = False
break
if zap:
self._zaptids.append(tid)
finally:
ct.close()
finally: finally:
cm.close() c.close()
# We're done with the mark table # We're done with the mark table
self._packmark.truncate(txn=txn) self._packmark.truncate(txn=txn)
def _collect(self, txn):
rec = self._zaptids.consume()
while rec:
tid = rec[1]
rec = self._zaptids.consume()
c = self._txnoids.cursor(txn)
try:
trec = c.set(tid)
while trec and trec[0] == tid:
oid = trec[1]
# We can get rid of this txnoids entry
c.delete()
trec = c.next_dup()
# Delete the metadata record
metadata = self._metadata[oid+tid]
self._metadata.delete(oid+tid, txn=txn)
# Decref the pickle
self._decrefPickle(oid, metadata[16:24], txn)
finally:
c.close()
# Set the status flag on the transaction metadata for this txn
txnmeta = self._txnMetadata[tid]
if txnmeta[0] <> PROTECTED_TRANSACTION:
txnmeta = PROTECTED_TRANSACTION + txnmeta[1:]
self._txnMetadata.put(tid, txnmeta, txn=txn)
def _decref(self, deltas, txn):
for oid, delta in deltas.items():
refcount = U64(self._refcounts.get(oid, ZERO)) + delta
if refcount > 0:
self._refcounts.put(oid, p64(refcount), txn=txn)
# This object is no longer referenced by any other object in the
# system. We can collect all traces of it.
c = self._serials.cursor(txn)
try:
rec = c.set(oid)
while rec and rec[0] == oid:
c.delete()
rec = c.next_dup()
finally:
c.close()
# Collect all metadata record that reference this object
c = self._metadata.cursor(txn)
try:
rec = c.set_range(oid)
while rec and rec[:8] == oid:
revid, metadata = rec
c.delete()
rec = c.next()
self._decrefPickle(oid, metadata[16:24], txn)
finally:
c.close()
def _decrefPickle(self, oid, lrevid, txn):
if lrevid == DNE:
# There is no pickle data
return
key = oid + lrevid
refcount = U64(self._pickleRefcounts.get(key, ZERO)) - 1
if refcount <= 0:
# We can collect this pickle
self._pickleRefcounts.delete(key, txn=txn)
data = self._pickles[key]
self._pickles.delete(key, txn=txn)
deltas = {}
self._update(deltas, data, -1)
self._decref(deltas, txn)
else:
self._pickleRefcounts.put(p64(refcount), txn=txn)
#
# GCable interface, for cyclic garbage collection (untested)
#
def gcTrash(oids):
"""Given a list of oids, treat them as trash.
This means they can be garbage collected, with all necessary cascading
reference counting performed
"""
# BAW: this is broken -- make it look like the end of pack()
self._lock_acquire()
c = None
try:
c = self._metadata.cursor()
for oid in oids:
# Convert to a string
oid = p64(oid)
# Delete all the metadata records
rec = c.set(oid)
while rec:
key, data = rec
rec = c.next_dup()
self._zaprevision(key)
finally:
if c:
c.close()
self._lock_release()
def gcRefcount(oid):
"""Return the reference count of the specified object.
Raises KeyError if there is no object with oid. Both the oid argument
and the returned reference count are integers.
"""
self._lock_acquire()
try:
return U64(self._refcounts[p64(oid)])
finally:
self._lock_release()
def gcReferences(oid):
"""Return a list of oids that the specified object refers to.
Raises KeyError if there is no object with oid. The oid argument
is an integer; the return value is a list of integers of oids.
"""
oids = []
c = None
self._lock_acquire()
try:
c = self._pickles.cursor()
rec = c.set(p64(oid))
while rec:
# We don't care about the key
pickle = rec[1]
rec = c.next_dup()
# Sniff the pickle for object references
tmpoids = []
referencesf(pickle, tmpoids)
# Convert to unsigned longs
oids.extend(map(U64, tmpoids))
# Make sure there's no duplicates, and convert to int
return oids
finally:
if c:
c.close()
self._lock_release()
# #
# Iterator protocol # Iterator protocol
# #
...@@ -1602,12 +1661,16 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -1602,12 +1661,16 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
tid, data = rec tid, data = rec
# Now unpack the necessary information. Don't impedence match the # Now unpack the necessary information. Don't impedence match the
# status flag (that's done by the caller). # status flag (that's done by the caller).
status = data[0] packtime = self._last_packtime()
userlen, desclen = unpack('>II', data[1:9]) if tid <= packtime:
user = data[9:9+userlen] packedp = True
desc = data[9+userlen:9+userlen+desclen] else:
ext = data[9+userlen+desclen:] packedp = False
return tid, status, user, desc, ext userlen, desclen = unpack('>II', data[:8])
user = data[8:8+userlen]
desc = data[8+userlen:8+userlen+desclen]
ext = data[8+userlen+desclen:]
return tid, packedp, user, desc, ext
finally: finally:
if c: if c:
c.close() c.close()
...@@ -1678,14 +1741,14 @@ class _TransactionsIterator(_GetItemBase): ...@@ -1678,14 +1741,14 @@ class _TransactionsIterator(_GetItemBase):
if self._closed: if self._closed:
raise IOError, 'iterator is closed' raise IOError, 'iterator is closed'
# Let IndexErrors percolate up. # Let IndexErrors percolate up.
tid, status, user, desc, ext = self._storage._nexttxn( tid, packedp, user, desc, ext = self._storage._nexttxn(
self._tid, self._first) self._tid, self._first)
self._first = False self._first = False
# Did we reach the specified end? # Did we reach the specified end?
if self._stop is not None and tid > self._stop: if self._stop is not None and tid > self._stop:
raise IndexError raise IndexError
self._tid = tid self._tid = tid
return _RecordsIterator(self._storage, tid, status, user, desc, ext) return _RecordsIterator(self._storage, tid, packedp, user, desc, ext)
def close(self): def close(self):
self._closed = True self._closed = True
...@@ -1715,14 +1778,14 @@ class _RecordsIterator(_GetItemBase): ...@@ -1715,14 +1778,14 @@ class _RecordsIterator(_GetItemBase):
description = None description = None
_extension = None _extension = None
def __init__(self, storage, tid, status, user, desc, ext): def __init__(self, storage, tid, packedp, user, desc, ext):
self._storage = storage self._storage = storage
self.tid = tid self.tid = tid
# Impedence matching # Impedence matching
if status == UNDOABLE_TRANSACTION: if packedp:
self.status = ' '
else:
self.status = 'p' self.status = 'p'
else:
self.status = ' '
self.user = user self.user = user
self.description = desc self.description = desc
self._extension = ext self._extension = ext
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment