Commit 6e8fbfb4 authored by Barry Warsaw's avatar Barry Warsaw

Rework the background threads to be poll-with-timeout based instead of

sleep based.  We create two ends of a pipe in the parent thread, with
the child thread polling/reading one end and the parent writing to the
other.  The only thing written to this pipe is a "stop marker" -- just
a string that wakes the thread up immediately when we're closing the
storage.

The primary reason for this is to speed up shutdown so that we don't
have to wait for the next tick of the sleep counter before we'll
trigger the thread shutdown.  Makes shutting down Zope using this
storage much quicker.

Specific changes include:

BerkeleyBase.py

    SLEEP_TIME -> JOIN_TIME since it's now just the interval we'll
    wait for the thread.join to complete.

    __init__():	Create both the checkpointer thread and the autopacker
    thread, set up the pipes, and get them both rolling.  We refactor
    creation of the autopacker instance into a separate overrideable
    method since this is the one bit that's different between the two
    storages.

    _make_autopacker(): Intended to be overridden.

    close(), _doclose(): Move the thread shutdown code out of the
    lock, since we don't want to potentially deadlock the shutting
    down of the background thread.  This could happen if close() was
    entered when the autopacker thread was between lock acquisitions
    in _dopack().  Also, be sure to write to the pipe to wake the
    child threads up immediately.

    env_from_string(): Wrap the actual creation and opening of the
    environment in a try/except, so that if there's a failure, we can
    be sure to give up the file lock.

    _WorkThread class: Accept a poll object which wraps the read end
    of the pipe.  Rework run() to do the poll-with-timeout instead of
    sleep.  I don't think we strictly need to do the fd read given the
    simplicity (and mono-commandity) of the protocol, but it can't
    hurt.  The _dowork() method's signature no longer contains the
    `now' variable.

Full.py

    _make_autopacker(): Override base class to return storage specific
    _Autopack instance.

Minimal.py

    Same, but also include some code cleanup.


Also, get rid of some unnecessary imports.
parent 680fb65b
...@@ -15,11 +15,9 @@ ...@@ -15,11 +15,9 @@
"""Berkeley storage with full undo and versioning support. """Berkeley storage with full undo and versioning support.
""" """
__version__ = '$Revision: 1.56 $'.split()[-2:][0] __version__ = '$Revision: 1.57 $'.split()[-2:][0]
import sys
import time import time
import threading
import cPickle as pickle import cPickle as pickle
from struct import pack, unpack from struct import pack, unpack
...@@ -238,15 +236,14 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -238,15 +236,14 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
self._delqueue = self._setupDB('delqueue', 0, db.DB_QUEUE, 8) self._delqueue = self._setupDB('delqueue', 0, db.DB_QUEUE, 8)
# Do recovery and consistency checks # Do recovery and consistency checks
self._withlock(self._dorecovery) self._withlock(self._dorecovery)
# Set up the autopacking thread
def _make_autopacker(self, poll):
config = self._config config = self._config
if config.frequency > 0: lastpacktime = U64(self._last_packtime())
lastpacktime = U64(self._last_packtime()) return _Autopack(
self._autopacker = _Autopack( self, poll, config.frequency,
self, config.frequency, config.packtime, config.classicpack,
config.packtime, config.classicpack, lastpacktime)
lastpacktime)
self._autopacker.start()
def _dorecovery(self): def _dorecovery(self):
# If these tables are non-empty, it means we crashed during a pack # If these tables are non-empty, it means we crashed during a pack
...@@ -1861,16 +1858,16 @@ class _Record: ...@@ -1861,16 +1858,16 @@ class _Record:
class _Autopack(_WorkThread): class _Autopack(_WorkThread):
def __init__(self, storage, frequency, packtime, classicpack, def __init__(self, storage, poll, frequency, packtime, classicpack,
lastpacktime): lastpacktime):
_WorkThread.__init__(self, storage, frequency, 'autopacking') _WorkThread.__init__(self, storage, poll, frequency, 'autopacking')
self._packtime = packtime self._packtime = packtime
self._classicpack = classicpack self._classicpack = classicpack
# Bookkeeping # Bookkeeping
self._stop = False self._stop = False
self._lastclassic = 0 self._lastclassic = 0
def _dowork(self, now): def _dowork(self):
# Should we do a classic pack this time? # Should we do a classic pack this time?
if self._classicpack <= 0: if self._classicpack <= 0:
classicp = False classicp = False
...@@ -1879,4 +1876,4 @@ class _Autopack(_WorkThread): ...@@ -1879,4 +1876,4 @@ class _Autopack(_WorkThread):
self._lastclassic = v self._lastclassic = v
classicp = not v classicp = not v
# Run the autopack phase # Run the autopack phase
self._storage.autopack(now - self._packtime, classicp) self._storage.autopack(time.time() - self._packtime, classicp)
...@@ -15,10 +15,7 @@ ...@@ -15,10 +15,7 @@
"""Berkeley storage without undo or versioning. """Berkeley storage without undo or versioning.
""" """
__version__ = '$Revision: 1.20 $'[-2:][0] __version__ = '$Revision: 1.21 $'[-2:][0]
import time
import threading
# This uses the Dunn/Kuchling PyBSDDB v3 extension module available from # This uses the Dunn/Kuchling PyBSDDB v3 extension module available from
# http://pybsddb.sourceforge.net. It is compatible with release 3.4 of # http://pybsddb.sourceforge.net. It is compatible with release 3.4 of
...@@ -26,7 +23,7 @@ import threading ...@@ -26,7 +23,7 @@ import threading
from bsddb3 import db from bsddb3 import db
from ZODB import POSException from ZODB import POSException
from ZODB.utils import U64, p64 from ZODB.utils import p64, U64
from ZODB.referencesf import referencesf from ZODB.referencesf import referencesf
from ZODB.ConflictResolution import ConflictResolvingStorage, ResolvedSerial from ZODB.ConflictResolution import ConflictResolvingStorage, ResolvedSerial
...@@ -40,12 +37,6 @@ COMMIT = 'C' ...@@ -40,12 +37,6 @@ COMMIT = 'C'
PRESENT = 'X' PRESENT = 'X'
ZERO = '\0'*8 ZERO = '\0'*8
# Number of seconds for the autopack thread to sleep before checking to see if
# it's time for another autopack run. Lower numbers mean more processing,
# higher numbers mean less responsiveness to shutdown requests. 10 seconds
# seems like a good compromise.
AUTOPACK_CHECK_SLEEP = 10
try: try:
True, False True, False
except NameError: except NameError:
...@@ -131,11 +122,9 @@ class Minimal(BerkeleyBase, ConflictResolvingStorage): ...@@ -131,11 +122,9 @@ class Minimal(BerkeleyBase, ConflictResolvingStorage):
self._withtxn(self._docommit, tid) self._withtxn(self._docommit, tid)
finally: finally:
self._lock_release() self._lock_release()
# Set up the autopacking thread
if self._config.frequency > 0: def _make_autopacker(self, poll):
config = self._config return _Autopack(self, poll, self._config.frequency)
self._autopacker = _Autopack(self, config.frequency)
self._autopacker.start()
def _doabort(self, txn, tid): def _doabort(self, txn, tid):
co = cs = None co = cs = None
...@@ -548,9 +537,9 @@ class Minimal(BerkeleyBase, ConflictResolvingStorage): ...@@ -548,9 +537,9 @@ class Minimal(BerkeleyBase, ConflictResolvingStorage):
class _Autopack(_WorkThread): class _Autopack(_WorkThread):
def __init__(self, storage, frequency): def __init__(self, storage, poll, frequency):
_WorkThread.__init__(self, storage, frequency, 'autopacking') _WorkThread.__init__(self, storage, poll, frequency, 'autopacking')
def _dowork(self, now): def _dowork(self):
# Run the autopack phase # Run the autopack phase
self._storage.pack('ignored', referencesf) self._storage.pack('ignored', referencesf)
...@@ -14,11 +14,12 @@ ...@@ -14,11 +14,12 @@
"""Base class for BerkeleyStorage implementations. """Base class for BerkeleyStorage implementations.
""" """
__version__ = '$Revision: 1.29 $'.split()[-2:][0] __version__ = '$Revision: 1.30 $'.split()[-2:][0]
import os import os
import time import time
import errno import errno
import select
import threading import threading
from types import StringType from types import StringType
...@@ -28,7 +29,6 @@ from bsddb3 import db ...@@ -28,7 +29,6 @@ from bsddb3 import db
# BaseStorage provides primitives for lock acquisition and release, and a host # BaseStorage provides primitives for lock acquisition and release, and a host
# of other methods, some of which are overridden here, some of which are not. # of other methods, some of which are overridden here, some of which are not.
from ZODB import POSException
from ZODB.lock_file import lock_file from ZODB.lock_file import lock_file
from ZODB.BaseStorage import BaseStorage from ZODB.BaseStorage import BaseStorage
from ZODB.referencesf import referencesf from ZODB.referencesf import referencesf
...@@ -37,12 +37,11 @@ import zLOG ...@@ -37,12 +37,11 @@ import zLOG
GBYTES = 1024 * 1024 * 1000 GBYTES = 1024 * 1024 * 1000
# Maximum number of seconds for background thread to sleep before checking to # How long should we wait to join one of the background daemon threads? It's
# see if it's time for another autopack run. Lower numbers mean more # a good idea to not set this too short, or we could corrupt our database.
# processing, higher numbers mean less responsiveness to shutdown requests. # That would be recoverable, but recovery could take a long time too, so it's
# 10 seconds seems like a good compromise. Note that if the check interval is # better to shutdown cleanly.
# less than the sleep time, the minimum will be used. JOIN_TIME = 10
SLEEP_TIME = 10
try: try:
True, False True, False
...@@ -189,7 +188,6 @@ class BerkeleyBase(BaseStorage): ...@@ -189,7 +188,6 @@ class BerkeleyBase(BaseStorage):
# Instantiate a pack lock # Instantiate a pack lock
self._packlock = ThreadLock.allocate_lock() self._packlock = ThreadLock.allocate_lock()
self._autopacker = None
self._stop = self._closed = False self._stop = self._closed = False
# Initialize a few other things # Initialize a few other things
self._prefix = prefix self._prefix = prefix
...@@ -199,11 +197,27 @@ class BerkeleyBase(BaseStorage): ...@@ -199,11 +197,27 @@ class BerkeleyBase(BaseStorage):
self._setupDBs() self._setupDBs()
# Initialize the object id counter. # Initialize the object id counter.
self._init_oid() self._init_oid()
# Set up the checkpointing thread
if config.interval > 0: if config.interval > 0:
self._checkpointer = _Checkpoint(self, config.interval) r, self._checkpointfd = os.pipe()
poll = select.poll()
poll.register(r, select.POLLIN)
self._checkpointer = _Checkpoint(self, poll, config.interval)
self._checkpointer.start() self._checkpointer.start()
else: else:
self._checkpointer = None self._checkpointer = None
# Set up the autopacking thread
if config.frequency > 0:
r, self._autopackfd = os.pipe()
poll = select.poll()
poll.register(r, select.POLLIN)
self._autopacker = self._make_autopacker(poll)
self._autopacker.start()
else:
self._autopacker = None
def _make_autopacker(self, poll):
raise NotImplementedError
def _setupDB(self, name, flags=0, dbtype=db.DB_BTREE, reclen=None): def _setupDB(self, name, flags=0, dbtype=db.DB_BTREE, reclen=None):
"""Open an individual database with the given flags. """Open an individual database with the given flags.
...@@ -321,9 +335,21 @@ class BerkeleyBase(BaseStorage): ...@@ -321,9 +335,21 @@ class BerkeleyBase(BaseStorage):
tables are closed, and finally the environment is force checkpointed tables are closed, and finally the environment is force checkpointed
and closed too. and closed too.
""" """
# Set this flag before acquiring the lock so we don't block waiting # We have to shutdown the background threads before we acquire the
# for the autopack thread to give up the lock. # lock, or we'll could end up closing the environment before the
# autopacking thread exits.
self._stop = True self._stop = True
# Stop the autopacker thread
if self._autopacker:
self.log('stopping autopacking thread')
self._autopacker.stop()
os.write(self._autopackfd, 'STOP')
self._autopacker.join(JOIN_TIME)
if self._checkpointer:
self.log('stopping checkpointing thread')
self._checkpointer.stop()
os.write(self._checkpointfd, 'STOP')
self._checkpointer.join(JOIN_TIME)
self._lock_acquire() self._lock_acquire()
try: try:
if not self._closed: if not self._closed:
...@@ -333,15 +359,6 @@ class BerkeleyBase(BaseStorage): ...@@ -333,15 +359,6 @@ class BerkeleyBase(BaseStorage):
self._lock_release() self._lock_release()
def _doclose(self): def _doclose(self):
# Stop the autopacker thread
if self._autopacker:
self.log('stopping autopacking thread')
self._autopacker.stop()
self._autopacker.join(SLEEP_TIME * 2)
if self._checkpointer:
self.log('stopping checkpointing thread')
self._checkpointer.stop()
self._checkpointer.join(SLEEP_TIME * 2)
# Close all the tables # Close all the tables
for d in self._tables: for d in self._tables:
d.close() d.close()
...@@ -426,30 +443,36 @@ def env_from_string(envname, config): ...@@ -426,30 +443,36 @@ def env_from_string(envname, config):
lock_file(lockfile) lock_file(lockfile)
lockfile.write(str(os.getpid())) lockfile.write(str(os.getpid()))
lockfile.flush() lockfile.flush()
# Create, initialize, and open the environment try:
env = db.DBEnv() # Create, initialize, and open the environment
if config.logdir is not None: env = db.DBEnv()
env.set_lg_dir(config.logdir) if config.logdir is not None:
gbytes, bytes = divmod(config.cachesize, GBYTES) env.set_lg_dir(config.logdir)
env.set_cachesize(gbytes, bytes) gbytes, bytes = divmod(config.cachesize, GBYTES)
env.open(envname, env.set_cachesize(gbytes, bytes)
db.DB_CREATE # create underlying files as necessary env.open(envname,
| db.DB_RECOVER # run normal recovery before opening db.DB_CREATE # create underlying files as necessary
| db.DB_INIT_MPOOL # initialize shared memory buffer pool | db.DB_RECOVER # run normal recovery before opening
| db.DB_INIT_TXN # initialize transaction subsystem | db.DB_INIT_MPOOL # initialize shared memory buffer pool
| db.DB_THREAD # we use the environment from other threads | db.DB_INIT_TXN # initialize transaction subsystem
) | db.DB_THREAD # we use the env from multiple threads
)
except:
lockfile.close()
raise
return env, lockfile return env, lockfile
class _WorkThread(threading.Thread): class _WorkThread(threading.Thread):
def __init__(self, storage, checkinterval, name='work'): def __init__(self, storage, poll, checkinterval, name='work'):
threading.Thread.__init__(self) threading.Thread.__init__(self)
self._storage = storage self._storage = storage
self._poll = poll
self._interval = checkinterval self._interval = checkinterval
self._name = name self._name = name
# Bookkeeping # Bookkeeping. _nextcheck is useful as a non-public interface aiding
# testing. See test_autopack.py.
self._stop = False self._stop = False
self._nextcheck = checkinterval self._nextcheck = checkinterval
# We don't want these threads to hold up process exit. That could # We don't want these threads to hold up process exit. That could
...@@ -461,27 +484,34 @@ class _WorkThread(threading.Thread): ...@@ -461,27 +484,34 @@ class _WorkThread(threading.Thread):
self._storage.log('%s thread started', name) self._storage.log('%s thread started', name)
while not self._stop: while not self._stop:
now = time.time() now = time.time()
if now > self._nextcheck: if now < self._nextcheck:
self._storage.log('running %s', name) continue
self._dowork(now) self._storage.log('running %s', name)
self._nextcheck = now + self._interval self._dowork()
# Now we sleep for a little while before we check again. Sleep self._nextcheck = now + self._interval
# for the minimum of self._interval and SLEEP_TIME so as to be as # Now we sleep for a little while before we check again. We use a
# responsive as possible to .stop() calls. # poll timeout so that when the parent thread writes its "stop
time.sleep(min(self._interval, SLEEP_TIME)) # marker" to the readfd, we'll exit out immediately.
fds = self._poll.poll(self._interval * 1000)
for fd, event in self._poll.poll(self._interval):
# Just read and throw away the data. The _stop flag will
# already have been set if we're being shutdown.
if event & select.POLLIN:
#print name, 'data:', os.read(fd, 1024)
os.read(fd, 1024)
self._storage.log('%s thread finished', name) self._storage.log('%s thread finished', name)
def stop(self): def stop(self):
self._stop = True self._stop = True
def _dowork(self, now): def _dowork(self):
pass pass
class _Checkpoint(_WorkThread): class _Checkpoint(_WorkThread):
def __init__(self, storage, interval): def __init__(self, storage, poll, interval):
_WorkThread.__init__(self, storage, interval, 'checkpointing') _WorkThread.__init__(self, storage, poll, interval, 'checkpointing')
def _dowork(self, now): def _dowork(self):
self._storage.docheckpoint() self._storage.docheckpoint()
...@@ -15,11 +15,9 @@ ...@@ -15,11 +15,9 @@
"""Berkeley storage with full undo and versioning support. """Berkeley storage with full undo and versioning support.
""" """
__version__ = '$Revision: 1.56 $'.split()[-2:][0] __version__ = '$Revision: 1.57 $'.split()[-2:][0]
import sys
import time import time
import threading
import cPickle as pickle import cPickle as pickle
from struct import pack, unpack from struct import pack, unpack
...@@ -238,15 +236,14 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -238,15 +236,14 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
self._delqueue = self._setupDB('delqueue', 0, db.DB_QUEUE, 8) self._delqueue = self._setupDB('delqueue', 0, db.DB_QUEUE, 8)
# Do recovery and consistency checks # Do recovery and consistency checks
self._withlock(self._dorecovery) self._withlock(self._dorecovery)
# Set up the autopacking thread
def _make_autopacker(self, poll):
config = self._config config = self._config
if config.frequency > 0: lastpacktime = U64(self._last_packtime())
lastpacktime = U64(self._last_packtime()) return _Autopack(
self._autopacker = _Autopack( self, poll, config.frequency,
self, config.frequency, config.packtime, config.classicpack,
config.packtime, config.classicpack, lastpacktime)
lastpacktime)
self._autopacker.start()
def _dorecovery(self): def _dorecovery(self):
# If these tables are non-empty, it means we crashed during a pack # If these tables are non-empty, it means we crashed during a pack
...@@ -1861,16 +1858,16 @@ class _Record: ...@@ -1861,16 +1858,16 @@ class _Record:
class _Autopack(_WorkThread): class _Autopack(_WorkThread):
def __init__(self, storage, frequency, packtime, classicpack, def __init__(self, storage, poll, frequency, packtime, classicpack,
lastpacktime): lastpacktime):
_WorkThread.__init__(self, storage, frequency, 'autopacking') _WorkThread.__init__(self, storage, poll, frequency, 'autopacking')
self._packtime = packtime self._packtime = packtime
self._classicpack = classicpack self._classicpack = classicpack
# Bookkeeping # Bookkeeping
self._stop = False self._stop = False
self._lastclassic = 0 self._lastclassic = 0
def _dowork(self, now): def _dowork(self):
# Should we do a classic pack this time? # Should we do a classic pack this time?
if self._classicpack <= 0: if self._classicpack <= 0:
classicp = False classicp = False
...@@ -1879,4 +1876,4 @@ class _Autopack(_WorkThread): ...@@ -1879,4 +1876,4 @@ class _Autopack(_WorkThread):
self._lastclassic = v self._lastclassic = v
classicp = not v classicp = not v
# Run the autopack phase # Run the autopack phase
self._storage.autopack(now - self._packtime, classicp) self._storage.autopack(time.time() - self._packtime, classicp)
...@@ -15,10 +15,7 @@ ...@@ -15,10 +15,7 @@
"""Berkeley storage without undo or versioning. """Berkeley storage without undo or versioning.
""" """
__version__ = '$Revision: 1.20 $'[-2:][0] __version__ = '$Revision: 1.21 $'[-2:][0]
import time
import threading
# This uses the Dunn/Kuchling PyBSDDB v3 extension module available from # This uses the Dunn/Kuchling PyBSDDB v3 extension module available from
# http://pybsddb.sourceforge.net. It is compatible with release 3.4 of # http://pybsddb.sourceforge.net. It is compatible with release 3.4 of
...@@ -26,7 +23,7 @@ import threading ...@@ -26,7 +23,7 @@ import threading
from bsddb3 import db from bsddb3 import db
from ZODB import POSException from ZODB import POSException
from ZODB.utils import U64, p64 from ZODB.utils import p64, U64
from ZODB.referencesf import referencesf from ZODB.referencesf import referencesf
from ZODB.ConflictResolution import ConflictResolvingStorage, ResolvedSerial from ZODB.ConflictResolution import ConflictResolvingStorage, ResolvedSerial
...@@ -40,12 +37,6 @@ COMMIT = 'C' ...@@ -40,12 +37,6 @@ COMMIT = 'C'
PRESENT = 'X' PRESENT = 'X'
ZERO = '\0'*8 ZERO = '\0'*8
# Number of seconds for the autopack thread to sleep before checking to see if
# it's time for another autopack run. Lower numbers mean more processing,
# higher numbers mean less responsiveness to shutdown requests. 10 seconds
# seems like a good compromise.
AUTOPACK_CHECK_SLEEP = 10
try: try:
True, False True, False
except NameError: except NameError:
...@@ -131,11 +122,9 @@ class Minimal(BerkeleyBase, ConflictResolvingStorage): ...@@ -131,11 +122,9 @@ class Minimal(BerkeleyBase, ConflictResolvingStorage):
self._withtxn(self._docommit, tid) self._withtxn(self._docommit, tid)
finally: finally:
self._lock_release() self._lock_release()
# Set up the autopacking thread
if self._config.frequency > 0: def _make_autopacker(self, poll):
config = self._config return _Autopack(self, poll, self._config.frequency)
self._autopacker = _Autopack(self, config.frequency)
self._autopacker.start()
def _doabort(self, txn, tid): def _doabort(self, txn, tid):
co = cs = None co = cs = None
...@@ -548,9 +537,9 @@ class Minimal(BerkeleyBase, ConflictResolvingStorage): ...@@ -548,9 +537,9 @@ class Minimal(BerkeleyBase, ConflictResolvingStorage):
class _Autopack(_WorkThread): class _Autopack(_WorkThread):
def __init__(self, storage, frequency): def __init__(self, storage, poll, frequency):
_WorkThread.__init__(self, storage, frequency, 'autopacking') _WorkThread.__init__(self, storage, poll, frequency, 'autopacking')
def _dowork(self, now): def _dowork(self):
# Run the autopack phase # Run the autopack phase
self._storage.pack('ignored', referencesf) self._storage.pack('ignored', referencesf)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment