Commit 9efc9849 authored by Barry Warsaw's avatar Barry Warsaw

Support for autopacking in a separate thread. BerkeleyConfig now has

three new configuration variables for controlling how autopacking
works.  Basically, you set an autopack frequency, a "packtime" -- the
point in the past you want to pack to -- and a counter for
automatically doing a classic pack.

Specific changes here include:

_setupDBs(): If autopacking is enabled, create the autopacking thread
object and get it started.

close(): When shutting down the storage, we need to stop and join the
autopacking thread, otherwise I think we have a high possibility of
corrupting our database (requiring recovery).

_dopack(): Add a flag for whether full gc should be done or not.
That's about the only difference between classic pack and autopack
(the latter does not do gc).

autopack(): The method that the autopacking thread calls to start an
autopack.  It takes a pack time with the same semantics as pack(), but
it also takes a flag specifying whether to do garbage collection of
unreachable objects or not.

_Autopack: A derived class of threading.Thread to handing the
background autopacking.
parent d1e0435a
...@@ -15,10 +15,11 @@ ...@@ -15,10 +15,11 @@
"""Berkeley storage with full undo and versioning support. """Berkeley storage with full undo and versioning support.
""" """
__version__ = '$Revision: 1.47 $'.split()[-2:][0] __version__ = '$Revision: 1.48 $'.split()[-2:][0]
import sys import sys
import time import time
import threading
import cPickle as pickle import cPickle as pickle
from struct import pack, unpack from struct import pack, unpack
...@@ -51,6 +52,12 @@ DNE = '\377'*8 ...@@ -51,6 +52,12 @@ DNE = '\377'*8
# DEBUGGING # DEBUGGING
#DNE = 'nonexist' #DNE = 'nonexist'
# Number of seconds for the autopack thread to sleep before checking to see if
# it's time for another autopack run. Lower numbers mean more processing,
# higher numbers mean less responsiveness to shutdown requests. 10 seconds
# seems like a good compromise.
AUTOPACK_CHECK_SLEEP = 10
try: try:
# Python 2.2 # Python 2.2
from _helper import incr from _helper import incr
...@@ -79,6 +86,8 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -79,6 +86,8 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
""" """
self._packlock = ThreadLock.allocate_lock() self._packlock = ThreadLock.allocate_lock()
BerkeleyBase.__init__(self, name, env, prefix, config) BerkeleyBase.__init__(self, name, env, prefix, config)
# The autopack thread is started in _setupDBs() because we need
# information in one of the tables.
def _setupDBs(self): def _setupDBs(self):
# Data Type Assumptions: # Data Type Assumptions:
...@@ -252,9 +261,18 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -252,9 +261,18 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
db.DB_QUEUE, db.DB_CREATE) db.DB_QUEUE, db.DB_CREATE)
# Do recovery and consistency checks # Do recovery and consistency checks
self._withlock(self._dorecovery) self._withlock(self._dorecovery)
# DEBUGGING # Set up the autopacking thread
#self._nextserial = 0L if self._config.frequency <= 0:
# END DEBUGGING # No autopacking
self._autopacker = None
else:
config = self._config
lastpacktime = U64(self._last_packtime())
self._autopacker = _Autopack(
self, config.frequency,
config.packtime, config.classicpack,
lastpacktime)
self._autopacker.start()
def _dorecovery(self): def _dorecovery(self):
# If these tables are non-empty, it means we crashed during a pack # If these tables are non-empty, it means we crashed during a pack
...@@ -290,6 +308,16 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -290,6 +308,16 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
self.__nextvid = 0L self.__nextvid = 0L
def close(self): def close(self):
# We must stop the autopacker first before closing any tables. BAW:
# should we use a timeout on the join() call? I'm not sure. On the
# one hand we don't want to block forever, but on the other, killing
# the autopacker thread in the middle of real work could leave the
# databases in a corrupted state, requiring recovery. With a
# AUTOPACK_CHECK_SLEEP low enough, we shouldn't be blocking for long.
if self._autopacker:
zLOG.LOG('Full storage', zLOG.INFO, 'stopping autopack thread')
self._autopacker.stop()
self._autopacker.join()
self._serials.close() self._serials.close()
self._pickles.close() self._pickles.close()
self._refcounts.close() self._refcounts.close()
...@@ -482,10 +510,6 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -482,10 +510,6 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
self._txnMetadata.put(tid, data, txn=txn) self._txnMetadata.put(tid, data, txn=txn)
def _begin(self, tid, u, d, e): def _begin(self, tid, u, d, e):
# DEBUGGING
#self._nextserial += 1
#self._serial = p64(self._nextserial)
# END DEBUGGING
self._withtxn(self._dobegin, self._serial, u, d, e) self._withtxn(self._dobegin, self._serial, u, d, e)
def _finish(self, tid, u, d, e): def _finish(self, tid, u, d, e):
...@@ -1359,7 +1383,7 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -1359,7 +1383,7 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
# to pass that around to the helper methods, so just assert they're # to pass that around to the helper methods, so just assert they're
# the same. # the same.
assert zreferencesf == referencesf assert zreferencesf == referencesf
zLOG.LOG('Full storage', zLOG.INFO, 'pack started') zLOG.LOG('Full storage', zLOG.INFO, 'classic pack started')
# A simple wrapper around the bulk of packing, but which acquires a # A simple wrapper around the bulk of packing, but which acquires a
# lock that prevents multiple packs from running at the same time. # lock that prevents multiple packs from running at the same time.
self._packlock.acquire() self._packlock.acquire()
...@@ -1371,13 +1395,12 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -1371,13 +1395,12 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
self._dopack(t) self._dopack(t)
finally: finally:
self._packlock.release() self._packlock.release()
zLOG.LOG('Full storage', zLOG.INFO, 'pack done') zLOG.LOG('Full storage', zLOG.INFO, 'classic pack finished')
def _dopack(self, t): def _dopack(self, t, gc=True):
# t is a TimeTime, or time float, convert this to a TimeStamp object, # t is a TimeTime, or time float, convert this to a TimeStamp object,
# using an algorithm similar to what's used in FileStorage. We know # using an algorithm similar to what's used in FileStorage. We know
# that our transaction ids, a.k.a. revision ids, are timestamps. BAW: # that our transaction ids, a.k.a. revision ids, are timestamps.
# This doesn't play nicely if you enable the `debugging tids'
# #
# BAW: should a pack time in the future be a ValueError? We'd have to # BAW: should a pack time in the future be a ValueError? We'd have to
# worry about clock skew, so for now, we just set the pack time to the # worry about clock skew, so for now, we just set the pack time to the
...@@ -1399,6 +1422,9 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -1399,6 +1422,9 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
self._withtxn(self._collect_objs) self._withtxn(self._collect_objs)
finally: finally:
self._lock_release() self._lock_release()
# If we're not doing a classic pack, we're done.
if not gc:
return
# Do a mark and sweep for garbage collection. Calculate the set of # Do a mark and sweep for garbage collection. Calculate the set of
# objects reachable from the root. Anything else is a candidate for # objects reachable from the root. Anything else is a candidate for
# having all their revisions packed away. The set of reachable # having all their revisions packed away. The set of reachable
...@@ -1423,6 +1449,23 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -1423,6 +1449,23 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
finally: finally:
self._lock_release() self._lock_release()
def autopack(self, t, gc):
zLOG.LOG('Full storage', zLOG.INFO,
'autopack started (packtime: %s, gc? %s)'
% (t, gc and 'yes' or 'no'))
# A simple wrapper around the bulk of packing, but which acquires a
# lock that prevents multiple packs from running at the same time.
self._packlock.acquire()
try:
# We don't wrap this in _withtxn() because we're going to do the
# operation across several Berkeley transactions, which allows
# other work to happen (stores and reads) while packing is being
# done.
self._dopack(t, gc)
finally:
self._packlock.release()
zLOG.LOG('Full storage', zLOG.INFO, 'autopack finished')
def _collect_revs(self, txn, packtid): def _collect_revs(self, txn, packtid):
ct = co = None ct = co = None
try: try:
...@@ -1826,3 +1869,42 @@ class _Record: ...@@ -1826,3 +1869,42 @@ class _Record:
self.version = version self.version = version
self.data = data self.data = data
self.data_txn = data_txn self.data_txn = data_txn
class _Autopack(threading.Thread):
def __init__(self, storage, frequency, packtime, classicpack,
lastpacktime):
threading.Thread.__init__(self)
self._storage = storage
self._frequency = frequency
self._packtime = packtime
self._classicpack = classicpack
# Bookkeeping
self._stop = False
self._nextpack = lastpacktime + self._frequency
self._lastclassic = 0
def run(self):
zLOG.LOG('Full storage', zLOG.INFO, 'autopack thread started')
while not self._stop:
now = time.time()
if now > self._nextpack:
# Should we do a classic pack this time?
if self._classicpack <= 0:
classicp = False
else:
v = (self._lastclassic + 1) % self._classicpack
self._lastclassic = v
classicp = not v
# Run the autopack phase
self._storage.autopack(now - self._packtime, classicp)
self._nextpack = now + self._frequency
# Now we sleep for a little while before we check again. Sleep
# for the minimum of self._frequency and AUTOPACK_CHECK_SLEEPso as
# to be as responsive as ossible to .stop() calls.
time.sleep(min(self._frequency, AUTOPACK_CHECK_SLEEP))
zLOG.LOG('Full storage', zLOG.INFO, 'autopack thread finished')
def stop(self):
self._stop = True
...@@ -15,10 +15,11 @@ ...@@ -15,10 +15,11 @@
"""Berkeley storage with full undo and versioning support. """Berkeley storage with full undo and versioning support.
""" """
__version__ = '$Revision: 1.47 $'.split()[-2:][0] __version__ = '$Revision: 1.48 $'.split()[-2:][0]
import sys import sys
import time import time
import threading
import cPickle as pickle import cPickle as pickle
from struct import pack, unpack from struct import pack, unpack
...@@ -51,6 +52,12 @@ DNE = '\377'*8 ...@@ -51,6 +52,12 @@ DNE = '\377'*8
# DEBUGGING # DEBUGGING
#DNE = 'nonexist' #DNE = 'nonexist'
# Number of seconds for the autopack thread to sleep before checking to see if
# it's time for another autopack run. Lower numbers mean more processing,
# higher numbers mean less responsiveness to shutdown requests. 10 seconds
# seems like a good compromise.
AUTOPACK_CHECK_SLEEP = 10
try: try:
# Python 2.2 # Python 2.2
from _helper import incr from _helper import incr
...@@ -79,6 +86,8 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -79,6 +86,8 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
""" """
self._packlock = ThreadLock.allocate_lock() self._packlock = ThreadLock.allocate_lock()
BerkeleyBase.__init__(self, name, env, prefix, config) BerkeleyBase.__init__(self, name, env, prefix, config)
# The autopack thread is started in _setupDBs() because we need
# information in one of the tables.
def _setupDBs(self): def _setupDBs(self):
# Data Type Assumptions: # Data Type Assumptions:
...@@ -252,9 +261,18 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -252,9 +261,18 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
db.DB_QUEUE, db.DB_CREATE) db.DB_QUEUE, db.DB_CREATE)
# Do recovery and consistency checks # Do recovery and consistency checks
self._withlock(self._dorecovery) self._withlock(self._dorecovery)
# DEBUGGING # Set up the autopacking thread
#self._nextserial = 0L if self._config.frequency <= 0:
# END DEBUGGING # No autopacking
self._autopacker = None
else:
config = self._config
lastpacktime = U64(self._last_packtime())
self._autopacker = _Autopack(
self, config.frequency,
config.packtime, config.classicpack,
lastpacktime)
self._autopacker.start()
def _dorecovery(self): def _dorecovery(self):
# If these tables are non-empty, it means we crashed during a pack # If these tables are non-empty, it means we crashed during a pack
...@@ -290,6 +308,16 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -290,6 +308,16 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
self.__nextvid = 0L self.__nextvid = 0L
def close(self): def close(self):
# We must stop the autopacker first before closing any tables. BAW:
# should we use a timeout on the join() call? I'm not sure. On the
# one hand we don't want to block forever, but on the other, killing
# the autopacker thread in the middle of real work could leave the
# databases in a corrupted state, requiring recovery. With a
# AUTOPACK_CHECK_SLEEP low enough, we shouldn't be blocking for long.
if self._autopacker:
zLOG.LOG('Full storage', zLOG.INFO, 'stopping autopack thread')
self._autopacker.stop()
self._autopacker.join()
self._serials.close() self._serials.close()
self._pickles.close() self._pickles.close()
self._refcounts.close() self._refcounts.close()
...@@ -482,10 +510,6 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -482,10 +510,6 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
self._txnMetadata.put(tid, data, txn=txn) self._txnMetadata.put(tid, data, txn=txn)
def _begin(self, tid, u, d, e): def _begin(self, tid, u, d, e):
# DEBUGGING
#self._nextserial += 1
#self._serial = p64(self._nextserial)
# END DEBUGGING
self._withtxn(self._dobegin, self._serial, u, d, e) self._withtxn(self._dobegin, self._serial, u, d, e)
def _finish(self, tid, u, d, e): def _finish(self, tid, u, d, e):
...@@ -1359,7 +1383,7 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -1359,7 +1383,7 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
# to pass that around to the helper methods, so just assert they're # to pass that around to the helper methods, so just assert they're
# the same. # the same.
assert zreferencesf == referencesf assert zreferencesf == referencesf
zLOG.LOG('Full storage', zLOG.INFO, 'pack started') zLOG.LOG('Full storage', zLOG.INFO, 'classic pack started')
# A simple wrapper around the bulk of packing, but which acquires a # A simple wrapper around the bulk of packing, but which acquires a
# lock that prevents multiple packs from running at the same time. # lock that prevents multiple packs from running at the same time.
self._packlock.acquire() self._packlock.acquire()
...@@ -1371,13 +1395,12 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -1371,13 +1395,12 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
self._dopack(t) self._dopack(t)
finally: finally:
self._packlock.release() self._packlock.release()
zLOG.LOG('Full storage', zLOG.INFO, 'pack done') zLOG.LOG('Full storage', zLOG.INFO, 'classic pack finished')
def _dopack(self, t): def _dopack(self, t, gc=True):
# t is a TimeTime, or time float, convert this to a TimeStamp object, # t is a TimeTime, or time float, convert this to a TimeStamp object,
# using an algorithm similar to what's used in FileStorage. We know # using an algorithm similar to what's used in FileStorage. We know
# that our transaction ids, a.k.a. revision ids, are timestamps. BAW: # that our transaction ids, a.k.a. revision ids, are timestamps.
# This doesn't play nicely if you enable the `debugging tids'
# #
# BAW: should a pack time in the future be a ValueError? We'd have to # BAW: should a pack time in the future be a ValueError? We'd have to
# worry about clock skew, so for now, we just set the pack time to the # worry about clock skew, so for now, we just set the pack time to the
...@@ -1399,6 +1422,9 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -1399,6 +1422,9 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
self._withtxn(self._collect_objs) self._withtxn(self._collect_objs)
finally: finally:
self._lock_release() self._lock_release()
# If we're not doing a classic pack, we're done.
if not gc:
return
# Do a mark and sweep for garbage collection. Calculate the set of # Do a mark and sweep for garbage collection. Calculate the set of
# objects reachable from the root. Anything else is a candidate for # objects reachable from the root. Anything else is a candidate for
# having all their revisions packed away. The set of reachable # having all their revisions packed away. The set of reachable
...@@ -1423,6 +1449,23 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -1423,6 +1449,23 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
finally: finally:
self._lock_release() self._lock_release()
def autopack(self, t, gc):
zLOG.LOG('Full storage', zLOG.INFO,
'autopack started (packtime: %s, gc? %s)'
% (t, gc and 'yes' or 'no'))
# A simple wrapper around the bulk of packing, but which acquires a
# lock that prevents multiple packs from running at the same time.
self._packlock.acquire()
try:
# We don't wrap this in _withtxn() because we're going to do the
# operation across several Berkeley transactions, which allows
# other work to happen (stores and reads) while packing is being
# done.
self._dopack(t, gc)
finally:
self._packlock.release()
zLOG.LOG('Full storage', zLOG.INFO, 'autopack finished')
def _collect_revs(self, txn, packtid): def _collect_revs(self, txn, packtid):
ct = co = None ct = co = None
try: try:
...@@ -1826,3 +1869,42 @@ class _Record: ...@@ -1826,3 +1869,42 @@ class _Record:
self.version = version self.version = version
self.data = data self.data = data
self.data_txn = data_txn self.data_txn = data_txn
class _Autopack(threading.Thread):
def __init__(self, storage, frequency, packtime, classicpack,
lastpacktime):
threading.Thread.__init__(self)
self._storage = storage
self._frequency = frequency
self._packtime = packtime
self._classicpack = classicpack
# Bookkeeping
self._stop = False
self._nextpack = lastpacktime + self._frequency
self._lastclassic = 0
def run(self):
zLOG.LOG('Full storage', zLOG.INFO, 'autopack thread started')
while not self._stop:
now = time.time()
if now > self._nextpack:
# Should we do a classic pack this time?
if self._classicpack <= 0:
classicp = False
else:
v = (self._lastclassic + 1) % self._classicpack
self._lastclassic = v
classicp = not v
# Run the autopack phase
self._storage.autopack(now - self._packtime, classicp)
self._nextpack = now + self._frequency
# Now we sleep for a little while before we check again. Sleep
# for the minimum of self._frequency and AUTOPACK_CHECK_SLEEPso as
# to be as responsive as ossible to .stop() calls.
time.sleep(min(self._frequency, AUTOPACK_CHECK_SLEEP))
zLOG.LOG('Full storage', zLOG.INFO, 'autopack thread finished')
def stop(self):
self._stop = True
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment