Commit 71f1008d authored by Barry Warsaw's avatar Barry Warsaw

_setupDBs(): The oidqueue table used to do double-duty, both during

the mark phase and during the sweep phase.  But this doesn't play
nicely with the escape hatch for pack, since if we raise a PackStop,
it's fine if we truncate the mark queue but not if we truncate the
sweep queue.  So the latter is now separated into the delqueue table.

Also, use the new extended _setupDB() arguments for the DB_QUEUE
tables.

close(): We can really simplify this and make it more robust for when
we add new tables, by relying on the fact that the base class
maintains its own list of opened tables, and the base class close()
method closes them in turn.

autopack(): Default the gc argument to False.

_collect_revs(), _collect_objs(), _mark(), _sweep(): Add an escape
hatch for the pack operation inside the inner loops of each of these
methods.  That way, we don't have to wait until the loops are finished
to exit the pack operation, if stop() has been requested by the main
thread.

AUTOPACK_CHECK_SLEEP removed.

_Autopack: Use the _WorkThread base class.
parent b4488779
......@@ -15,7 +15,7 @@
"""Berkeley storage with full undo and versioning support.
"""
__version__ = '$Revision: 1.53 $'.split()[-2:][0]
__version__ = '$Revision: 1.54 $'.split()[-2:][0]
import sys
import time
......@@ -39,7 +39,7 @@ import zLOG
# the Full and Minimal implementations. It in turn inherits from
# ZODB.BaseStorage.BaseStorage which itself provides some common storage
# functionality.
from BerkeleyBase import BerkeleyBase
from BerkeleyBase import BerkeleyBase, PackStop, _WorkThread
ABORT = 'A'
COMMIT = 'C'
......@@ -51,12 +51,6 @@ DNE = '\377'*8
# DEBUGGING
#DNE = 'nonexist'
# Number of seconds for the autopack thread to sleep before checking to see if
# it's time for another autopack run. Lower numbers mean more processing,
# higher numbers mean less responsiveness to shutdown requests. 10 seconds
# seems like a good compromise.
AUTOPACK_CHECK_SLEEP = 10
try:
# Python 2.2
from _helper import incr
......@@ -208,6 +202,11 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
# object is first written to a version, no entry is written here.
# We do write an entry when we commit or abort the version.
#
# delqueue -- [oid]
# This is also a Queue, not a BTree. It is used during pack to
# list objects for which no more references exist, such that the
# objects can be completely packed away.
#
# packmark -- [oid]
# Every object reachable from the root during a classic pack
# operation will have its oid present in this table.
......@@ -215,9 +214,6 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
# oidqueue -- [oid]
# This table is a Queue, not a BTree. It is used during the mark
# phase of pack() and contains a list of oids for work to be done.
# It is also used during pack to list objects for which no more
# references exist, such that the objects can be completely packed
# away.
#
self._serials = self._setupDB('serials', db.DB_DUP)
self._pickles = self._setupDB('pickles')
......@@ -239,16 +235,13 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
self._objrevs = self._setupDB('objrevs', db.DB_DUP)
self._packmark = self._setupDB('packmark')
self._packtime = self._setupDB('packtime')
self._oidqueue = db.DB(self._env)
self._oidqueue.set_re_len(8)
# BAW: do we need to set the queue extent size?
self._oidqueue.open(self._prefix + 'oidqueue',
db.DB_QUEUE, db.DB_CREATE)
self._oidqueue = self._setupDB('oidqueue', 0, db.DB_QUEUE, 8)
self._delqueue = self._setupDB('delqueue', 0, db.DB_QUEUE, 8)
# Do recovery and consistency checks
self._withlock(self._dorecovery)
# Set up the autopacking thread
if self._config.frequency > 0:
config = self._config
config = self._config
if config.frequency > 0:
lastpacktime = U64(self._last_packtime())
self._autopacker = _Autopack(
self, config.frequency,
......@@ -304,35 +297,22 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
self.__ltid = ZERO
def close(self):
# We must stop the autopacker first before closing any tables. BAW:
# should we use a timeout on the join() call? I'm not sure. On the
# one hand we don't want to block forever, but on the other, killing
# the autopacker thread in the middle of real work could leave the
# databases in a corrupted state, requiring recovery. With a
# AUTOPACK_CHECK_SLEEP low enough, we shouldn't be blocking for long.
if self._autopacker:
zLOG.LOG('Full storage', zLOG.INFO, 'stopping autopack thread')
self._autopacker.stop()
self._autopacker.join()
self._serials.close()
self._pickles.close()
self._refcounts.close()
self._oids.close()
self._pvids.close()
self._prevrevids.close()
self._pending.close()
self._vids.close()
self._versions.close()
self._currentVersions.close()
self._metadata.close()
self._txnMetadata.close()
self._txnoids.close()
self._pickleRefcounts.close()
self._objrevs.close()
self._packtime.close()
self._packmark.close()
self._oidqueue.close()
BerkeleyBase.close(self)
# Set this flag before acquiring the lock so we don't block waiting
# for the autopack thread to give up the lock.
self._stop = True
self._lock_acquire()
try:
# We must stop the autopacker and checkpointing threads first
# before closing any tables. I'm not sure about the join()
# timeout, but I'd be surprised if any particular iteration of a
# pack-related loops take longer than a few seconds.
if self._autopacker:
zLOG.LOG('Full storage', zLOG.INFO, 'stopping autopack thread')
self._autopacker.stop()
self._autopacker.join(30)
BerkeleyBase.close(self)
finally:
self._lock_release()
def _doabort(self, txn, tid):
# First clean up the oid indexed (or oid+tid indexed) tables.
......@@ -1401,9 +1381,7 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
self._withtxn(self._collect_revs, packtid)
finally:
self._lock_release()
# Collect any objects with refcount zero. We do this before the mark
# and sweep because we're sharing the oidqueue table for two different
# purposes.
# Collect any objects with refcount zero.
self._lock_acquire()
try:
self._withtxn(self._collect_objs)
......@@ -1436,7 +1414,12 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
finally:
self._lock_release()
def autopack(self, t, gc):
def autopack(self, t, gc=False):
"""Perform an autopack pass.
Autopacking is different than classic pack() in that it doesn't do
cyclic garbage detection unless the gc flag is True.
"""
zLOG.LOG('Full storage', zLOG.INFO,
'autopack started (packtime: %s, gc? %s)'
% (t, gc and 'yes' or 'no'))
......@@ -1460,6 +1443,8 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
ct = self._txnoids.cursor(txn=txn)
rec = co.first()
while rec:
if self._stop:
raise PackStop, 'stopped in _collect_revs()'
revid, oldserial = rec
newserial = revid[:8]
oid = revid[8:]
......@@ -1526,11 +1511,13 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
else:
# This object is no longer referenced by any other object in
# the system. We can collect all traces of it.
self._oidqueue.append(oid, txn)
self._delqueue.append(oid, txn)
def _collect_objs(self, txn):
orec = self._oidqueue.consume()
orec = self._delqueue.consume()
while orec:
if self._stop:
raise PackStop, 'stopped in _collect_objs()'
oid = orec[1]
# Delete the object from the serials table
c = self._serials.cursor(txn)
......@@ -1540,6 +1527,8 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
except db.DBNotFoundError:
rec = None
while rec and rec[0] == oid:
if self._stop:
raise PackStop, 'stopped in _collect_objs() loop 1'
c.delete()
rec = c.next_dup()
# We don't need the refcounts any more, but note that if the
......@@ -1559,6 +1548,8 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
except db.DBNotFoundError:
rec = None
while rec and rec[0][:8] == oid:
if self._stop:
raise PackStop, 'stopped in _collect_objs() loop 2'
revid, metadata = rec
tid = revid[8:]
c.delete()
......@@ -1587,8 +1578,8 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
c.close()
# We really do want this down here, since _decrefPickle() could
# add more items to the queue.
orec = self._oidqueue.consume()
assert len(self._oidqueue) == 0
orec = self._delqueue.consume()
assert len(self._delqueue) == 0
def _findrev(self, oid, packtid, txn):
# BAW: Maybe this could probably be more efficient by not doing so
......@@ -1630,6 +1621,8 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
# root references, and then for each of those, find all the objects it
# references, and so on until we've traversed the entire object graph.
while oid:
if self._stop:
raise PackStop, 'stopped in _mark()'
if not self._packmark.has_key(oid):
# We haven't seen this object yet
self._packmark.put(oid, PRESENT, txn=txn)
......@@ -1659,6 +1652,8 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
try:
rec = c.first()
while rec:
if self._stop:
raise PackStop, 'stopped in _sweep()'
oid = rec[0]
rec = c.next()
serial, tid = self._getSerialAndTid(oid)
......@@ -1671,7 +1666,7 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
# reachable objects) doesn't have a record for this guy, then
# we can zap it. Do so by appending to oidqueue.
if not self._packmark.has_key(oid):
self._oidqueue.append(oid, txn)
self._delqueue.append(oid, txn)
finally:
c.close()
# We're done with the mark table
......@@ -1878,39 +1873,23 @@ class _Record:
class _Autopack(threading.Thread):
class _Autopack(_WorkThread):
def __init__(self, storage, frequency, packtime, classicpack,
lastpacktime):
threading.Thread.__init__(self)
self._storage = storage
self._frequency = frequency
_WorkThread.__init__(self, storage, frequency, 'autopacking')
self._packtime = packtime
self._classicpack = classicpack
# Bookkeeping
self._stop = False
self._nextpack = lastpacktime + self._frequency
self._lastclassic = 0
def run(self):
zLOG.LOG('Full storage', zLOG.INFO, 'autopack thread started')
while not self._stop:
now = time.time()
if now > self._nextpack:
# Should we do a classic pack this time?
if self._classicpack <= 0:
classicp = False
else:
v = (self._lastclassic + 1) % self._classicpack
self._lastclassic = v
classicp = not v
# Run the autopack phase
self._storage.autopack(now - self._packtime, classicp)
self._nextpack = now + self._frequency
# Now we sleep for a little while before we check again. Sleep
# for the minimum of self._frequency and AUTOPACK_CHECK_SLEEPso as
# to be as responsive as ossible to .stop() calls.
time.sleep(min(self._frequency, AUTOPACK_CHECK_SLEEP))
zLOG.LOG('Full storage', zLOG.INFO, 'autopack thread finished')
def stop(self):
self._stop = True
def _dowork(self, now):
# Should we do a classic pack this time?
if self._classicpack <= 0:
classicp = False
else:
v = (self._lastclassic + 1) % self._classicpack
self._lastclassic = v
classicp = not v
# Run the autopack phase
self._storage.autopack(now - self._packtime, classicp)
......@@ -15,7 +15,7 @@
"""Berkeley storage with full undo and versioning support.
"""
__version__ = '$Revision: 1.53 $'.split()[-2:][0]
__version__ = '$Revision: 1.54 $'.split()[-2:][0]
import sys
import time
......@@ -39,7 +39,7 @@ import zLOG
# the Full and Minimal implementations. It in turn inherits from
# ZODB.BaseStorage.BaseStorage which itself provides some common storage
# functionality.
from BerkeleyBase import BerkeleyBase
from BerkeleyBase import BerkeleyBase, PackStop, _WorkThread
ABORT = 'A'
COMMIT = 'C'
......@@ -51,12 +51,6 @@ DNE = '\377'*8
# DEBUGGING
#DNE = 'nonexist'
# Number of seconds for the autopack thread to sleep before checking to see if
# it's time for another autopack run. Lower numbers mean more processing,
# higher numbers mean less responsiveness to shutdown requests. 10 seconds
# seems like a good compromise.
AUTOPACK_CHECK_SLEEP = 10
try:
# Python 2.2
from _helper import incr
......@@ -208,6 +202,11 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
# object is first written to a version, no entry is written here.
# We do write an entry when we commit or abort the version.
#
# delqueue -- [oid]
# This is also a Queue, not a BTree. It is used during pack to
# list objects for which no more references exist, such that the
# objects can be completely packed away.
#
# packmark -- [oid]
# Every object reachable from the root during a classic pack
# operation will have its oid present in this table.
......@@ -215,9 +214,6 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
# oidqueue -- [oid]
# This table is a Queue, not a BTree. It is used during the mark
# phase of pack() and contains a list of oids for work to be done.
# It is also used during pack to list objects for which no more
# references exist, such that the objects can be completely packed
# away.
#
self._serials = self._setupDB('serials', db.DB_DUP)
self._pickles = self._setupDB('pickles')
......@@ -239,16 +235,13 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
self._objrevs = self._setupDB('objrevs', db.DB_DUP)
self._packmark = self._setupDB('packmark')
self._packtime = self._setupDB('packtime')
self._oidqueue = db.DB(self._env)
self._oidqueue.set_re_len(8)
# BAW: do we need to set the queue extent size?
self._oidqueue.open(self._prefix + 'oidqueue',
db.DB_QUEUE, db.DB_CREATE)
self._oidqueue = self._setupDB('oidqueue', 0, db.DB_QUEUE, 8)
self._delqueue = self._setupDB('delqueue', 0, db.DB_QUEUE, 8)
# Do recovery and consistency checks
self._withlock(self._dorecovery)
# Set up the autopacking thread
if self._config.frequency > 0:
config = self._config
config = self._config
if config.frequency > 0:
lastpacktime = U64(self._last_packtime())
self._autopacker = _Autopack(
self, config.frequency,
......@@ -304,35 +297,22 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
self.__ltid = ZERO
def close(self):
# We must stop the autopacker first before closing any tables. BAW:
# should we use a timeout on the join() call? I'm not sure. On the
# one hand we don't want to block forever, but on the other, killing
# the autopacker thread in the middle of real work could leave the
# databases in a corrupted state, requiring recovery. With a
# AUTOPACK_CHECK_SLEEP low enough, we shouldn't be blocking for long.
if self._autopacker:
zLOG.LOG('Full storage', zLOG.INFO, 'stopping autopack thread')
self._autopacker.stop()
self._autopacker.join()
self._serials.close()
self._pickles.close()
self._refcounts.close()
self._oids.close()
self._pvids.close()
self._prevrevids.close()
self._pending.close()
self._vids.close()
self._versions.close()
self._currentVersions.close()
self._metadata.close()
self._txnMetadata.close()
self._txnoids.close()
self._pickleRefcounts.close()
self._objrevs.close()
self._packtime.close()
self._packmark.close()
self._oidqueue.close()
BerkeleyBase.close(self)
# Set this flag before acquiring the lock so we don't block waiting
# for the autopack thread to give up the lock.
self._stop = True
self._lock_acquire()
try:
# We must stop the autopacker and checkpointing threads first
# before closing any tables. I'm not sure about the join()
# timeout, but I'd be surprised if any particular iteration of a
# pack-related loops take longer than a few seconds.
if self._autopacker:
zLOG.LOG('Full storage', zLOG.INFO, 'stopping autopack thread')
self._autopacker.stop()
self._autopacker.join(30)
BerkeleyBase.close(self)
finally:
self._lock_release()
def _doabort(self, txn, tid):
# First clean up the oid indexed (or oid+tid indexed) tables.
......@@ -1401,9 +1381,7 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
self._withtxn(self._collect_revs, packtid)
finally:
self._lock_release()
# Collect any objects with refcount zero. We do this before the mark
# and sweep because we're sharing the oidqueue table for two different
# purposes.
# Collect any objects with refcount zero.
self._lock_acquire()
try:
self._withtxn(self._collect_objs)
......@@ -1436,7 +1414,12 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
finally:
self._lock_release()
def autopack(self, t, gc):
def autopack(self, t, gc=False):
"""Perform an autopack pass.
Autopacking is different than classic pack() in that it doesn't do
cyclic garbage detection unless the gc flag is True.
"""
zLOG.LOG('Full storage', zLOG.INFO,
'autopack started (packtime: %s, gc? %s)'
% (t, gc and 'yes' or 'no'))
......@@ -1460,6 +1443,8 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
ct = self._txnoids.cursor(txn=txn)
rec = co.first()
while rec:
if self._stop:
raise PackStop, 'stopped in _collect_revs()'
revid, oldserial = rec
newserial = revid[:8]
oid = revid[8:]
......@@ -1526,11 +1511,13 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
else:
# This object is no longer referenced by any other object in
# the system. We can collect all traces of it.
self._oidqueue.append(oid, txn)
self._delqueue.append(oid, txn)
def _collect_objs(self, txn):
orec = self._oidqueue.consume()
orec = self._delqueue.consume()
while orec:
if self._stop:
raise PackStop, 'stopped in _collect_objs()'
oid = orec[1]
# Delete the object from the serials table
c = self._serials.cursor(txn)
......@@ -1540,6 +1527,8 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
except db.DBNotFoundError:
rec = None
while rec and rec[0] == oid:
if self._stop:
raise PackStop, 'stopped in _collect_objs() loop 1'
c.delete()
rec = c.next_dup()
# We don't need the refcounts any more, but note that if the
......@@ -1559,6 +1548,8 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
except db.DBNotFoundError:
rec = None
while rec and rec[0][:8] == oid:
if self._stop:
raise PackStop, 'stopped in _collect_objs() loop 2'
revid, metadata = rec
tid = revid[8:]
c.delete()
......@@ -1587,8 +1578,8 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
c.close()
# We really do want this down here, since _decrefPickle() could
# add more items to the queue.
orec = self._oidqueue.consume()
assert len(self._oidqueue) == 0
orec = self._delqueue.consume()
assert len(self._delqueue) == 0
def _findrev(self, oid, packtid, txn):
# BAW: Maybe this could probably be more efficient by not doing so
......@@ -1630,6 +1621,8 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
# root references, and then for each of those, find all the objects it
# references, and so on until we've traversed the entire object graph.
while oid:
if self._stop:
raise PackStop, 'stopped in _mark()'
if not self._packmark.has_key(oid):
# We haven't seen this object yet
self._packmark.put(oid, PRESENT, txn=txn)
......@@ -1659,6 +1652,8 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
try:
rec = c.first()
while rec:
if self._stop:
raise PackStop, 'stopped in _sweep()'
oid = rec[0]
rec = c.next()
serial, tid = self._getSerialAndTid(oid)
......@@ -1671,7 +1666,7 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
# reachable objects) doesn't have a record for this guy, then
# we can zap it. Do so by appending to oidqueue.
if not self._packmark.has_key(oid):
self._oidqueue.append(oid, txn)
self._delqueue.append(oid, txn)
finally:
c.close()
# We're done with the mark table
......@@ -1878,39 +1873,23 @@ class _Record:
class _Autopack(threading.Thread):
class _Autopack(_WorkThread):
def __init__(self, storage, frequency, packtime, classicpack,
lastpacktime):
threading.Thread.__init__(self)
self._storage = storage
self._frequency = frequency
_WorkThread.__init__(self, storage, frequency, 'autopacking')
self._packtime = packtime
self._classicpack = classicpack
# Bookkeeping
self._stop = False
self._nextpack = lastpacktime + self._frequency
self._lastclassic = 0
def run(self):
zLOG.LOG('Full storage', zLOG.INFO, 'autopack thread started')
while not self._stop:
now = time.time()
if now > self._nextpack:
# Should we do a classic pack this time?
if self._classicpack <= 0:
classicp = False
else:
v = (self._lastclassic + 1) % self._classicpack
self._lastclassic = v
classicp = not v
# Run the autopack phase
self._storage.autopack(now - self._packtime, classicp)
self._nextpack = now + self._frequency
# Now we sleep for a little while before we check again. Sleep
# for the minimum of self._frequency and AUTOPACK_CHECK_SLEEPso as
# to be as responsive as ossible to .stop() calls.
time.sleep(min(self._frequency, AUTOPACK_CHECK_SLEEP))
zLOG.LOG('Full storage', zLOG.INFO, 'autopack thread finished')
def stop(self):
self._stop = True
def _dowork(self, now):
# Should we do a classic pack this time?
if self._classicpack <= 0:
classicp = False
else:
v = (self._lastclassic + 1) % self._classicpack
self._lastclassic = v
classicp = not v
# Run the autopack phase
self._storage.autopack(now - self._packtime, classicp)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment