Commit d5ce0a93 authored by Barry Warsaw's avatar Barry Warsaw

Refactoring.

close(): The base class can now do all the work of closing, including
stopping the background threads.

log(): Much more convenient.
parent c6502e01
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
"""Berkeley storage with full undo and versioning support. """Berkeley storage with full undo and versioning support.
""" """
__version__ = '$Revision: 1.54 $'.split()[-2:][0] __version__ = '$Revision: 1.55 $'.split()[-2:][0]
import sys import sys
import time import time
...@@ -33,7 +33,6 @@ from ZODB.utils import p64, U64 ...@@ -33,7 +33,6 @@ from ZODB.utils import p64, U64
from ZODB.referencesf import referencesf from ZODB.referencesf import referencesf
from ZODB.TimeStamp import TimeStamp from ZODB.TimeStamp import TimeStamp
from ZODB.ConflictResolution import ConflictResolvingStorage, ResolvedSerial from ZODB.ConflictResolution import ConflictResolvingStorage, ResolvedSerial
import zLOG
# BerkeleyBase.BerkeleyBase class provides some common functionality for both # BerkeleyBase.BerkeleyBase class provides some common functionality for both
# the Full and Minimal implementations. It in turn inherits from # the Full and Minimal implementations. It in turn inherits from
...@@ -296,24 +295,6 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -296,24 +295,6 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
else: else:
self.__ltid = ZERO self.__ltid = ZERO
def close(self):
# Set this flag before acquiring the lock so we don't block waiting
# for the autopack thread to give up the lock.
self._stop = True
self._lock_acquire()
try:
# We must stop the autopacker and checkpointing threads first
# before closing any tables. I'm not sure about the join()
# timeout, but I'd be surprised if any particular iteration of a
# pack-related loops take longer than a few seconds.
if self._autopacker:
zLOG.LOG('Full storage', zLOG.INFO, 'stopping autopack thread')
self._autopacker.stop()
self._autopacker.join(30)
BerkeleyBase.close(self)
finally:
self._lock_release()
def _doabort(self, txn, tid): def _doabort(self, txn, tid):
# First clean up the oid indexed (or oid+tid indexed) tables. # First clean up the oid indexed (or oid+tid indexed) tables.
co = cs = ct = cv = None co = cs = ct = cv = None
...@@ -1350,7 +1331,7 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -1350,7 +1331,7 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
# to pass that around to the helper methods, so just assert they're # to pass that around to the helper methods, so just assert they're
# the same. # the same.
assert zreferencesf == referencesf assert zreferencesf == referencesf
zLOG.LOG('Full storage', zLOG.INFO, 'classic pack started') self.log('classic pack started')
# A simple wrapper around the bulk of packing, but which acquires a # A simple wrapper around the bulk of packing, but which acquires a
# lock that prevents multiple packs from running at the same time. # lock that prevents multiple packs from running at the same time.
self._packlock.acquire() self._packlock.acquire()
...@@ -1362,7 +1343,7 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -1362,7 +1343,7 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
self._dopack(t) self._dopack(t)
finally: finally:
self._packlock.release() self._packlock.release()
zLOG.LOG('Full storage', zLOG.INFO, 'classic pack finished') self.log('classic pack finished')
def _dopack(self, t, gc=True): def _dopack(self, t, gc=True):
# t is a TimeTime, or time float, convert this to a TimeStamp object, # t is a TimeTime, or time float, convert this to a TimeStamp object,
...@@ -1420,9 +1401,8 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -1420,9 +1401,8 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
Autopacking is different than classic pack() in that it doesn't do Autopacking is different than classic pack() in that it doesn't do
cyclic garbage detection unless the gc flag is True. cyclic garbage detection unless the gc flag is True.
""" """
zLOG.LOG('Full storage', zLOG.INFO, self.log('autopack started (packtime: %s, gc? %s)', t,
'autopack started (packtime: %s, gc? %s)' (gc and 'yes' or 'no'))
% (t, gc and 'yes' or 'no'))
# A simple wrapper around the bulk of packing, but which acquires a # A simple wrapper around the bulk of packing, but which acquires a
# lock that prevents multiple packs from running at the same time. # lock that prevents multiple packs from running at the same time.
self._packlock.acquire() self._packlock.acquire()
...@@ -1434,7 +1414,7 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -1434,7 +1414,7 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
self._dopack(t, gc) self._dopack(t, gc)
finally: finally:
self._packlock.release() self._packlock.release()
zLOG.LOG('Full storage', zLOG.INFO, 'autopack finished') self.log('autopack finished')
def _collect_revs(self, txn, packtid): def _collect_revs(self, txn, packtid):
ct = co = None ct = co = None
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
"""Berkeley storage without undo or versioning. """Berkeley storage without undo or versioning.
""" """
__version__ = '$Revision: 1.17 $'[-2:][0] __version__ = '$Revision: 1.18 $'[-2:][0]
import time import time
import threading import threading
...@@ -29,7 +29,6 @@ from ZODB import POSException ...@@ -29,7 +29,6 @@ from ZODB import POSException
from ZODB.utils import U64, p64 from ZODB.utils import U64, p64
from ZODB.referencesf import referencesf from ZODB.referencesf import referencesf
from ZODB.ConflictResolution import ConflictResolvingStorage, ResolvedSerial from ZODB.ConflictResolution import ConflictResolvingStorage, ResolvedSerial
import zLOG
# BerkeleyBase class provides some common functionality for BerkeleyDB-based # BerkeleyBase class provides some common functionality for BerkeleyDB-based
# storages. It in turn inherits from BaseStorage which itself provides some # storages. It in turn inherits from BaseStorage which itself provides some
...@@ -138,25 +137,6 @@ class Minimal(BerkeleyBase, ConflictResolvingStorage): ...@@ -138,25 +137,6 @@ class Minimal(BerkeleyBase, ConflictResolvingStorage):
self._autopacker = _Autopack(self, config.frequency) self._autopacker = _Autopack(self, config.frequency)
self._autopacker.start() self._autopacker.start()
def close(self):
# Set this flag before acquiring the lock so we don't block waiting
# for the autopack thread to give up the lock.
self._stop = True
self._lock_acquire()
try:
# We must stop the autopacker and checkpointing threads first
# before closing any tables. I'm not sure about the join()
# timeout, but I'd be surprised if any particular iteration of a
# pack-related loops take longer than a few seconds.
if self._autopacker:
zLOG.LOG('Minimal storage', zLOG.INFO,
'stopping autopack thread')
self._autopacker.stop()
self._autopacker.join(30)
BerkeleyBase.close(self)
finally:
self._lock_release()
def _doabort(self, txn, tid): def _doabort(self, txn, tid):
co = cs = None co = cs = None
try: try:
...@@ -377,7 +357,7 @@ class Minimal(BerkeleyBase, ConflictResolvingStorage): ...@@ -377,7 +357,7 @@ class Minimal(BerkeleyBase, ConflictResolvingStorage):
# to pass that around to the helper methods, so just assert they're # to pass that around to the helper methods, so just assert they're
# the same. # the same.
assert zreferencesf == referencesf assert zreferencesf == referencesf
zLOG.LOG('Minimal storage', zLOG.INFO, 'classic pack started') self.log('classic pack started')
# A simple wrapper around the bulk of packing, but which acquires a # A simple wrapper around the bulk of packing, but which acquires a
# lock that prevents multiple packs from running at the same time. # lock that prevents multiple packs from running at the same time.
self._packlock.acquire() self._packlock.acquire()
...@@ -392,7 +372,7 @@ class Minimal(BerkeleyBase, ConflictResolvingStorage): ...@@ -392,7 +372,7 @@ class Minimal(BerkeleyBase, ConflictResolvingStorage):
self._dopack() self._dopack()
finally: finally:
self._packlock.release() self._packlock.release()
zLOG.LOG('Minimal storage', zLOG.INFO, 'classic pack finished') self.log('classic pack finished')
def _dopack(self): def _dopack(self):
# Do a mark and sweep for garbage collection. Calculate the set of # Do a mark and sweep for garbage collection. Calculate the set of
......
...@@ -14,7 +14,7 @@ ...@@ -14,7 +14,7 @@
"""Base class for BerkeleyStorage implementations. """Base class for BerkeleyStorage implementations.
""" """
__version__ = '$Revision: 1.25 $'.split()[-2:][0] __version__ = '$Revision: 1.26 $'.split()[-2:][0]
import os import os
import time import time
...@@ -301,16 +301,36 @@ class BerkeleyBase(BaseStorage): ...@@ -301,16 +301,36 @@ class BerkeleyBase(BaseStorage):
# performed by the methods in the derived storage class. # performed by the methods in the derived storage class.
pass pass
def log(self, msg, *args):
zLOG.LOG(self.__class__.__name__, zLOG.INFO, msg % args)
def close(self): def close(self):
"""Close the storage by closing the databases it uses and by closing """Close the storage.
its environment.
All background threads are stopped and joined first, then all the
tables are closed, and finally the environment is force checkpointed
and closed too.
""" """
# Close all the tables # Set this flag before acquiring the lock so we don't block waiting
# for the autopack thread to give up the lock.
self._stop = True
self._lock_acquire()
try:
self._doclose()
finally:
self._lock_release()
def _doclose(self):
# Stop the autopacker thread
if self._autopacker:
self.log('stopping autopacking thread')
self._autopacker.stop()
self._autopacker.join(SLEEP_TIME * 2)
if self._checkpointer: if self._checkpointer:
zLOG.LOG('Full storage', zLOG.INFO, self.log('stopping checkpointing thread')
'stopping checkpointing thread')
self._checkpointer.stop() self._checkpointer.stop()
self._checkpointer.join(SLEEP_TIME * 2) self._checkpointer.join(SLEEP_TIME * 2)
# Close all the tables
for d in self._tables: for d in self._tables:
d.close() d.close()
# As recommended by Keith Bostic @ Sleepycat, we need to do # As recommended by Keith Bostic @ Sleepycat, we need to do
...@@ -426,18 +446,18 @@ class _WorkThread(threading.Thread): ...@@ -426,18 +446,18 @@ class _WorkThread(threading.Thread):
def run(self): def run(self):
name = self._name name = self._name
zLOG.LOG('Berkeley storage', zLOG.INFO, '%s thread started' % name) self._storage.log('%s thread started', name)
while not self._stop: while not self._stop:
now = time.time() now = time.time()
if now > self._nextcheck: if now > self._nextcheck:
zLOG.LOG('Berkeley storage', zLOG.INFO, 'running %s' % name) self._storage.log('running %s', name)
self._dowork(now) self._dowork(now)
self._nextcheck = now + self._interval self._nextcheck = now + self._interval
# Now we sleep for a little while before we check again. Sleep # Now we sleep for a little while before we check again. Sleep
# for the minimum of self._interval and SLEEP_TIME so as to be as # for the minimum of self._interval and SLEEP_TIME so as to be as
# responsive as possible to .stop() calls. # responsive as possible to .stop() calls.
time.sleep(min(self._interval, SLEEP_TIME)) time.sleep(min(self._interval, SLEEP_TIME))
zLOG.LOG('Berkeley storage', zLOG.INFO, '%s thread finished' % name) self._storage.log('%s thread finished', name)
def stop(self): def stop(self):
self._stop = True self._stop = True
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
"""Berkeley storage with full undo and versioning support. """Berkeley storage with full undo and versioning support.
""" """
__version__ = '$Revision: 1.54 $'.split()[-2:][0] __version__ = '$Revision: 1.55 $'.split()[-2:][0]
import sys import sys
import time import time
...@@ -33,7 +33,6 @@ from ZODB.utils import p64, U64 ...@@ -33,7 +33,6 @@ from ZODB.utils import p64, U64
from ZODB.referencesf import referencesf from ZODB.referencesf import referencesf
from ZODB.TimeStamp import TimeStamp from ZODB.TimeStamp import TimeStamp
from ZODB.ConflictResolution import ConflictResolvingStorage, ResolvedSerial from ZODB.ConflictResolution import ConflictResolvingStorage, ResolvedSerial
import zLOG
# BerkeleyBase.BerkeleyBase class provides some common functionality for both # BerkeleyBase.BerkeleyBase class provides some common functionality for both
# the Full and Minimal implementations. It in turn inherits from # the Full and Minimal implementations. It in turn inherits from
...@@ -296,24 +295,6 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -296,24 +295,6 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
else: else:
self.__ltid = ZERO self.__ltid = ZERO
def close(self):
# Set this flag before acquiring the lock so we don't block waiting
# for the autopack thread to give up the lock.
self._stop = True
self._lock_acquire()
try:
# We must stop the autopacker and checkpointing threads first
# before closing any tables. I'm not sure about the join()
# timeout, but I'd be surprised if any particular iteration of a
# pack-related loops take longer than a few seconds.
if self._autopacker:
zLOG.LOG('Full storage', zLOG.INFO, 'stopping autopack thread')
self._autopacker.stop()
self._autopacker.join(30)
BerkeleyBase.close(self)
finally:
self._lock_release()
def _doabort(self, txn, tid): def _doabort(self, txn, tid):
# First clean up the oid indexed (or oid+tid indexed) tables. # First clean up the oid indexed (or oid+tid indexed) tables.
co = cs = ct = cv = None co = cs = ct = cv = None
...@@ -1350,7 +1331,7 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -1350,7 +1331,7 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
# to pass that around to the helper methods, so just assert they're # to pass that around to the helper methods, so just assert they're
# the same. # the same.
assert zreferencesf == referencesf assert zreferencesf == referencesf
zLOG.LOG('Full storage', zLOG.INFO, 'classic pack started') self.log('classic pack started')
# A simple wrapper around the bulk of packing, but which acquires a # A simple wrapper around the bulk of packing, but which acquires a
# lock that prevents multiple packs from running at the same time. # lock that prevents multiple packs from running at the same time.
self._packlock.acquire() self._packlock.acquire()
...@@ -1362,7 +1343,7 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -1362,7 +1343,7 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
self._dopack(t) self._dopack(t)
finally: finally:
self._packlock.release() self._packlock.release()
zLOG.LOG('Full storage', zLOG.INFO, 'classic pack finished') self.log('classic pack finished')
def _dopack(self, t, gc=True): def _dopack(self, t, gc=True):
# t is a TimeTime, or time float, convert this to a TimeStamp object, # t is a TimeTime, or time float, convert this to a TimeStamp object,
...@@ -1420,9 +1401,8 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -1420,9 +1401,8 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
Autopacking is different than classic pack() in that it doesn't do Autopacking is different than classic pack() in that it doesn't do
cyclic garbage detection unless the gc flag is True. cyclic garbage detection unless the gc flag is True.
""" """
zLOG.LOG('Full storage', zLOG.INFO, self.log('autopack started (packtime: %s, gc? %s)', t,
'autopack started (packtime: %s, gc? %s)' (gc and 'yes' or 'no'))
% (t, gc and 'yes' or 'no'))
# A simple wrapper around the bulk of packing, but which acquires a # A simple wrapper around the bulk of packing, but which acquires a
# lock that prevents multiple packs from running at the same time. # lock that prevents multiple packs from running at the same time.
self._packlock.acquire() self._packlock.acquire()
...@@ -1434,7 +1414,7 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -1434,7 +1414,7 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
self._dopack(t, gc) self._dopack(t, gc)
finally: finally:
self._packlock.release() self._packlock.release()
zLOG.LOG('Full storage', zLOG.INFO, 'autopack finished') self.log('autopack finished')
def _collect_revs(self, txn, packtid): def _collect_revs(self, txn, packtid):
ct = co = None ct = co = None
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
"""Berkeley storage without undo or versioning. """Berkeley storage without undo or versioning.
""" """
__version__ = '$Revision: 1.17 $'[-2:][0] __version__ = '$Revision: 1.18 $'[-2:][0]
import time import time
import threading import threading
...@@ -29,7 +29,6 @@ from ZODB import POSException ...@@ -29,7 +29,6 @@ from ZODB import POSException
from ZODB.utils import U64, p64 from ZODB.utils import U64, p64
from ZODB.referencesf import referencesf from ZODB.referencesf import referencesf
from ZODB.ConflictResolution import ConflictResolvingStorage, ResolvedSerial from ZODB.ConflictResolution import ConflictResolvingStorage, ResolvedSerial
import zLOG
# BerkeleyBase class provides some common functionality for BerkeleyDB-based # BerkeleyBase class provides some common functionality for BerkeleyDB-based
# storages. It in turn inherits from BaseStorage which itself provides some # storages. It in turn inherits from BaseStorage which itself provides some
...@@ -138,25 +137,6 @@ class Minimal(BerkeleyBase, ConflictResolvingStorage): ...@@ -138,25 +137,6 @@ class Minimal(BerkeleyBase, ConflictResolvingStorage):
self._autopacker = _Autopack(self, config.frequency) self._autopacker = _Autopack(self, config.frequency)
self._autopacker.start() self._autopacker.start()
def close(self):
# Set this flag before acquiring the lock so we don't block waiting
# for the autopack thread to give up the lock.
self._stop = True
self._lock_acquire()
try:
# We must stop the autopacker and checkpointing threads first
# before closing any tables. I'm not sure about the join()
# timeout, but I'd be surprised if any particular iteration of a
# pack-related loops take longer than a few seconds.
if self._autopacker:
zLOG.LOG('Minimal storage', zLOG.INFO,
'stopping autopack thread')
self._autopacker.stop()
self._autopacker.join(30)
BerkeleyBase.close(self)
finally:
self._lock_release()
def _doabort(self, txn, tid): def _doabort(self, txn, tid):
co = cs = None co = cs = None
try: try:
...@@ -377,7 +357,7 @@ class Minimal(BerkeleyBase, ConflictResolvingStorage): ...@@ -377,7 +357,7 @@ class Minimal(BerkeleyBase, ConflictResolvingStorage):
# to pass that around to the helper methods, so just assert they're # to pass that around to the helper methods, so just assert they're
# the same. # the same.
assert zreferencesf == referencesf assert zreferencesf == referencesf
zLOG.LOG('Minimal storage', zLOG.INFO, 'classic pack started') self.log('classic pack started')
# A simple wrapper around the bulk of packing, but which acquires a # A simple wrapper around the bulk of packing, but which acquires a
# lock that prevents multiple packs from running at the same time. # lock that prevents multiple packs from running at the same time.
self._packlock.acquire() self._packlock.acquire()
...@@ -392,7 +372,7 @@ class Minimal(BerkeleyBase, ConflictResolvingStorage): ...@@ -392,7 +372,7 @@ class Minimal(BerkeleyBase, ConflictResolvingStorage):
self._dopack() self._dopack()
finally: finally:
self._packlock.release() self._packlock.release()
zLOG.LOG('Minimal storage', zLOG.INFO, 'classic pack finished') self.log('classic pack finished')
def _dopack(self): def _dopack(self):
# Do a mark and sweep for garbage collection. Calculate the set of # Do a mark and sweep for garbage collection. Calculate the set of
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment