Commit f595bdbf authored by Barry Warsaw's avatar Barry Warsaw

Simplify again the WorkThread api. Don't use a non-portable poll

object -- because we just care about one event we can simply use a
threading.Event object.  Specific changes include,

BerkeleyBase.py

    __init__(): Simplify the creation of the checkpointer and
    autopacker worker threads.

    close(): Replace autopacker.stop() with setting the Event object.
    This both kicks us out of the wait() and sets the thread's
    internal stop flag, so it's all we need.

    _WorkThread.init(): Take the `name' argument out of the
    constructor.  It was the only thing that 2/3rds of the subclasses
    needed to override, so just stick it in a class attribute.

    run(): Simplify to use the Event object.  Also, change _nextcheck
    to recalculate `now' after the work is done.  There's no telling
    how much time the work will take (it may not matter much in
    practice).

    stop(): Removed.

    _Checkpoint.__init__(): Removed

Full.py

    _make_autopacker(): Updated

    _Autopack.__init__(): Updated

Minimal.py

    _make_autopacker(): Updated

    _Autopack.__init__(): Removed
parent 6fd17cc8
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
"""Berkeley storage with full undo and versioning support. """Berkeley storage with full undo and versioning support.
""" """
__version__ = '$Revision: 1.57 $'.split()[-2:][0] __version__ = '$Revision: 1.58 $'.split()[-2:][0]
import time import time
import cPickle as pickle import cPickle as pickle
...@@ -237,12 +237,12 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -237,12 +237,12 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
# Do recovery and consistency checks # Do recovery and consistency checks
self._withlock(self._dorecovery) self._withlock(self._dorecovery)
def _make_autopacker(self, poll): def _make_autopacker(self, event):
config = self._config config = self._config
lastpacktime = U64(self._last_packtime()) lastpacktime = U64(self._last_packtime())
return _Autopack( return _Autopack(
self, poll, config.frequency, self, event,
config.packtime, config.classicpack, config.frequency, config.packtime, config.classicpack,
lastpacktime) lastpacktime)
def _dorecovery(self): def _dorecovery(self):
...@@ -1858,9 +1858,12 @@ class _Record: ...@@ -1858,9 +1858,12 @@ class _Record:
class _Autopack(_WorkThread): class _Autopack(_WorkThread):
def __init__(self, storage, poll, frequency, packtime, classicpack, NAME = 'autopacking'
def __init__(self, storage, event,
frequency, packtime, classicpack,
lastpacktime): lastpacktime):
_WorkThread.__init__(self, storage, poll, frequency, 'autopacking') _WorkThread.__init__(self, storage, event, frequency)
self._packtime = packtime self._packtime = packtime
self._classicpack = classicpack self._classicpack = classicpack
# Bookkeeping # Bookkeeping
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
"""Berkeley storage without undo or versioning. """Berkeley storage without undo or versioning.
""" """
__version__ = '$Revision: 1.21 $'[-2:][0] __version__ = '$Revision: 1.22 $'[-2:][0]
# This uses the Dunn/Kuchling PyBSDDB v3 extension module available from # This uses the Dunn/Kuchling PyBSDDB v3 extension module available from
# http://pybsddb.sourceforge.net. It is compatible with release 3.4 of # http://pybsddb.sourceforge.net. It is compatible with release 3.4 of
...@@ -123,8 +123,8 @@ class Minimal(BerkeleyBase, ConflictResolvingStorage): ...@@ -123,8 +123,8 @@ class Minimal(BerkeleyBase, ConflictResolvingStorage):
finally: finally:
self._lock_release() self._lock_release()
def _make_autopacker(self, poll): def _make_autopacker(self, event):
return _Autopack(self, poll, self._config.frequency) return _Autopack(self, event, self._config.frequency)
def _doabort(self, txn, tid): def _doabort(self, txn, tid):
co = cs = None co = cs = None
...@@ -537,8 +537,7 @@ class Minimal(BerkeleyBase, ConflictResolvingStorage): ...@@ -537,8 +537,7 @@ class Minimal(BerkeleyBase, ConflictResolvingStorage):
class _Autopack(_WorkThread): class _Autopack(_WorkThread):
def __init__(self, storage, poll, frequency): NAME = 'autopacking'
_WorkThread.__init__(self, storage, poll, frequency, 'autopacking')
def _dowork(self): def _dowork(self):
# Run the autopack phase # Run the autopack phase
......
...@@ -14,12 +14,11 @@ ...@@ -14,12 +14,11 @@
"""Base class for BerkeleyStorage implementations. """Base class for BerkeleyStorage implementations.
""" """
__version__ = '$Revision: 1.30 $'.split()[-2:][0] __version__ = '$Revision: 1.31 $'.split()[-2:][0]
import os import os
import time import time
import errno import errno
import select
import threading import threading
from types import StringType from types import StringType
...@@ -199,19 +198,15 @@ class BerkeleyBase(BaseStorage): ...@@ -199,19 +198,15 @@ class BerkeleyBase(BaseStorage):
self._init_oid() self._init_oid()
# Set up the checkpointing thread # Set up the checkpointing thread
if config.interval > 0: if config.interval > 0:
r, self._checkpointfd = os.pipe() self._checkpointstop = event = threading.Event()
poll = select.poll() self._checkpointer = _Checkpoint(self, event, config.interval)
poll.register(r, select.POLLIN)
self._checkpointer = _Checkpoint(self, poll, config.interval)
self._checkpointer.start() self._checkpointer.start()
else: else:
self._checkpointer = None self._checkpointer = None
# Set up the autopacking thread # Set up the autopacking thread
if config.frequency > 0: if config.frequency > 0:
r, self._autopackfd = os.pipe() self._autopackstop = event = threading.Event()
poll = select.poll() self._autopacker = self._make_autopacker(event)
poll.register(r, select.POLLIN)
self._autopacker = self._make_autopacker(poll)
self._autopacker.start() self._autopacker.start()
else: else:
self._autopacker = None self._autopacker = None
...@@ -342,13 +337,13 @@ class BerkeleyBase(BaseStorage): ...@@ -342,13 +337,13 @@ class BerkeleyBase(BaseStorage):
# Stop the autopacker thread # Stop the autopacker thread
if self._autopacker: if self._autopacker:
self.log('stopping autopacking thread') self.log('stopping autopacking thread')
self._autopacker.stop() # Setting the event also toggles the stop flag
os.write(self._autopackfd, 'STOP') self._autopackstop.set()
self._autopacker.join(JOIN_TIME) self._autopacker.join(JOIN_TIME)
if self._checkpointer: if self._checkpointer:
self.log('stopping checkpointing thread') self.log('stopping checkpointing thread')
self._checkpointer.stop() # Setting the event also toggles the stop flag
os.write(self._checkpointfd, 'STOP') self._checkpointstop.set()
self._checkpointer.join(JOIN_TIME) self._checkpointer.join(JOIN_TIME)
self._lock_acquire() self._lock_acquire()
try: try:
...@@ -465,12 +460,13 @@ def env_from_string(envname, config): ...@@ -465,12 +460,13 @@ def env_from_string(envname, config):
class _WorkThread(threading.Thread): class _WorkThread(threading.Thread):
def __init__(self, storage, poll, checkinterval, name='work'): NAME = 'worker'
def __init__(self, storage, event, checkinterval):
threading.Thread.__init__(self) threading.Thread.__init__(self)
self._storage = storage self._storage = storage
self._poll = poll self._event = event
self._interval = checkinterval self._interval = checkinterval
self._name = name
# Bookkeeping. _nextcheck is useful as a non-public interface aiding # Bookkeeping. _nextcheck is useful as a non-public interface aiding
# testing. See test_autopack.py. # testing. See test_autopack.py.
self._stop = False self._stop = False
...@@ -480,38 +476,28 @@ class _WorkThread(threading.Thread): ...@@ -480,38 +476,28 @@ class _WorkThread(threading.Thread):
self.setDaemon(True) self.setDaemon(True)
def run(self): def run(self):
name = self._name name = self.NAME
self._storage.log('%s thread started', name) self._storage.log('%s thread started', name)
while not self._stop: while not self._stop:
now = time.time() now = time.time()
if now < self._nextcheck: if now >= self._nextcheck:
continue self._storage.log('running %s', name)
self._storage.log('running %s', name) self._dowork()
self._dowork() # Recalculate `now' because _dowork() could have taken a
self._nextcheck = now + self._interval # while. time.time() can be expensive, but oh well.
# Now we sleep for a little while before we check again. We use a self._nextcheck = time.time() + self._interval
# poll timeout so that when the parent thread writes its "stop # Block w/ timeout on the shutdown event.
# marker" to the readfd, we'll exit out immediately. self._event.wait(self._interval)
fds = self._poll.poll(self._interval * 1000) self._stop = self._event.isSet()
for fd, event in self._poll.poll(self._interval):
# Just read and throw away the data. The _stop flag will
# already have been set if we're being shutdown.
if event & select.POLLIN:
#print name, 'data:', os.read(fd, 1024)
os.read(fd, 1024)
self._storage.log('%s thread finished', name) self._storage.log('%s thread finished', name)
def stop(self):
self._stop = True
def _dowork(self): def _dowork(self):
pass pass
class _Checkpoint(_WorkThread): class _Checkpoint(_WorkThread):
def __init__(self, storage, poll, interval): NAME = 'checkpointing'
_WorkThread.__init__(self, storage, poll, interval, 'checkpointing')
def _dowork(self): def _dowork(self):
self._storage.docheckpoint() self._storage.docheckpoint()
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
"""Berkeley storage with full undo and versioning support. """Berkeley storage with full undo and versioning support.
""" """
__version__ = '$Revision: 1.57 $'.split()[-2:][0] __version__ = '$Revision: 1.58 $'.split()[-2:][0]
import time import time
import cPickle as pickle import cPickle as pickle
...@@ -237,12 +237,12 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -237,12 +237,12 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
# Do recovery and consistency checks # Do recovery and consistency checks
self._withlock(self._dorecovery) self._withlock(self._dorecovery)
def _make_autopacker(self, poll): def _make_autopacker(self, event):
config = self._config config = self._config
lastpacktime = U64(self._last_packtime()) lastpacktime = U64(self._last_packtime())
return _Autopack( return _Autopack(
self, poll, config.frequency, self, event,
config.packtime, config.classicpack, config.frequency, config.packtime, config.classicpack,
lastpacktime) lastpacktime)
def _dorecovery(self): def _dorecovery(self):
...@@ -1858,9 +1858,12 @@ class _Record: ...@@ -1858,9 +1858,12 @@ class _Record:
class _Autopack(_WorkThread): class _Autopack(_WorkThread):
def __init__(self, storage, poll, frequency, packtime, classicpack, NAME = 'autopacking'
def __init__(self, storage, event,
frequency, packtime, classicpack,
lastpacktime): lastpacktime):
_WorkThread.__init__(self, storage, poll, frequency, 'autopacking') _WorkThread.__init__(self, storage, event, frequency)
self._packtime = packtime self._packtime = packtime
self._classicpack = classicpack self._classicpack = classicpack
# Bookkeeping # Bookkeeping
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
"""Berkeley storage without undo or versioning. """Berkeley storage without undo or versioning.
""" """
__version__ = '$Revision: 1.21 $'[-2:][0] __version__ = '$Revision: 1.22 $'[-2:][0]
# This uses the Dunn/Kuchling PyBSDDB v3 extension module available from # This uses the Dunn/Kuchling PyBSDDB v3 extension module available from
# http://pybsddb.sourceforge.net. It is compatible with release 3.4 of # http://pybsddb.sourceforge.net. It is compatible with release 3.4 of
...@@ -123,8 +123,8 @@ class Minimal(BerkeleyBase, ConflictResolvingStorage): ...@@ -123,8 +123,8 @@ class Minimal(BerkeleyBase, ConflictResolvingStorage):
finally: finally:
self._lock_release() self._lock_release()
def _make_autopacker(self, poll): def _make_autopacker(self, event):
return _Autopack(self, poll, self._config.frequency) return _Autopack(self, event, self._config.frequency)
def _doabort(self, txn, tid): def _doabort(self, txn, tid):
co = cs = None co = cs = None
...@@ -537,8 +537,7 @@ class Minimal(BerkeleyBase, ConflictResolvingStorage): ...@@ -537,8 +537,7 @@ class Minimal(BerkeleyBase, ConflictResolvingStorage):
class _Autopack(_WorkThread): class _Autopack(_WorkThread):
def __init__(self, storage, poll, frequency): NAME = 'autopacking'
_WorkThread.__init__(self, storage, poll, frequency, 'autopacking')
def _dowork(self): def _dowork(self):
# Run the autopack phase # Run the autopack phase
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment