Commit 0dc859e1 authored by Barry Warsaw's avatar Barry Warsaw

_setupDBs(): Use the new extended _setupDB() arguments for the

DB_QUEUE tables.

close(): We can really simplify this and make it more robust for when
we add new tables, by relying on the fact that the base class
maintains its own list of opened tables, and the base class close()
method closes them in turn.

autopack(): Default the gc argument to False.

_collect_objs(), _mark(), _sweep(): Add an escape hatch for the pack
operation inside the inner loops of each of these methods.  That way,
we don't have to wait until the loops are finished to exit the pack
operation, if stop() has been requested by the main thread.

_Autopack: Use the _WorkThread base class.
parent 71f1008d
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
"""Berkeley storage without undo or versioning. """Berkeley storage without undo or versioning.
""" """
__version__ = '$Revision: 1.15 $'[-2:][0] __version__ = '$Revision: 1.16 $'[-2:][0]
import time import time
import threading import threading
...@@ -25,16 +25,17 @@ import threading ...@@ -25,16 +25,17 @@ import threading
# PyBSDDB3. # PyBSDDB3.
from bsddb3 import db from bsddb3 import db
# BerkeleyBase class provides some common functionality for BerkeleyDB-based
# storages. It in turn inherits from BaseStorage which itself provides some
# common storage functionality.
from BerkeleyBase import BerkeleyBase
from ZODB import POSException from ZODB import POSException
from ZODB.utils import U64, p64 from ZODB.utils import U64, p64
from ZODB.referencesf import referencesf from ZODB.referencesf import referencesf
from ZODB.ConflictResolution import ConflictResolvingStorage, ResolvedSerial from ZODB.ConflictResolution import ConflictResolvingStorage, ResolvedSerial
import zLOG import zLOG
# BerkeleyBase class provides some common functionality for BerkeleyDB-based
# storages. It in turn inherits from BaseStorage which itself provides some
# common storage functionality.
from BerkeleyBase import BerkeleyBase, PackStop, _WorkThread
ABORT = 'A' ABORT = 'A'
COMMIT = 'C' COMMIT = 'C'
PRESENT = 'X' PRESENT = 'X'
...@@ -112,11 +113,7 @@ class Minimal(BerkeleyBase, ConflictResolvingStorage): ...@@ -112,11 +113,7 @@ class Minimal(BerkeleyBase, ConflictResolvingStorage):
self._pending = self._setupDB('pending') self._pending = self._setupDB('pending')
# Tables to support packing. # Tables to support packing.
self._packmark = self._setupDB('packmark') self._packmark = self._setupDB('packmark')
self._oidqueue = db.DB(self._env) self._oidqueue = self._setupDB('oidqueue', 0, db.DB_QUEUE, 8)
self._oidqueue.set_re_len(8)
# BAW: do we need to set the queue extent size?
self._oidqueue.open(self._prefix + 'oidqueue',
db.DB_QUEUE, db.DB_CREATE)
# Do recovery and consistency checks # Do recovery and consistency checks
pendings = self._pending.keys() pendings = self._pending.keys()
assert len(pendings) <= 1 assert len(pendings) <= 1
...@@ -142,24 +139,23 @@ class Minimal(BerkeleyBase, ConflictResolvingStorage): ...@@ -142,24 +139,23 @@ class Minimal(BerkeleyBase, ConflictResolvingStorage):
self._autopacker.start() self._autopacker.start()
def close(self): def close(self):
# We must stop the autopacker first before closing any tables. BAW: # Set this flag before acquiring the lock so we don't block waiting
# should we use a timeout on the join() call? I'm not sure. On the # for the autopack thread to give up the lock.
# one hand we don't want to block forever, but on the other, killing self._stop = True
# the autopacker thread in the middle of real work could leave the self._lock_acquire()
# databases in a corrupted state, requiring recovery. With a try:
# AUTOPACK_CHECK_SLEEP low enough, we shouldn't be blocking for long. # We must stop the autopacker and checkpointing threads first
if self._autopacker: # before closing any tables. I'm not sure about the join()
zLOG.LOG('Minimal storage', zLOG.INFO, 'stopping autopack thread') # timeout, but I'd be surprised if any particular iteration of a
self._autopacker.stop() # pack-related loops take longer than a few seconds.
self._autopacker.join() if self._autopacker:
self._serials.close() zLOG.LOG('Minimal storage', zLOG.INFO,
self._pickles.close() 'stopping autopack thread')
self._refcounts.close() self._autopacker.stop()
self._oids.close() self._autopacker.join(30)
self._pending.close() BerkeleyBase.close(self)
self._packmark.close() finally:
self._oidqueue.close() self._lock_release()
BerkeleyBase.close(self)
def _doabort(self, txn, tid): def _doabort(self, txn, tid):
co = cs = None co = cs = None
...@@ -440,6 +436,8 @@ class Minimal(BerkeleyBase, ConflictResolvingStorage): ...@@ -440,6 +436,8 @@ class Minimal(BerkeleyBase, ConflictResolvingStorage):
# root references, and then for each of those, find all the objects it # root references, and then for each of those, find all the objects it
# references, and so on until we've traversed the entire object graph. # references, and so on until we've traversed the entire object graph.
while oid: while oid:
if self._stop:
raise PackStop, 'stopped in _mark()'
if not self._packmark.has_key(oid): if not self._packmark.has_key(oid):
# We've haven't yet seen this object # We've haven't yet seen this object
self._packmark.put(oid, PRESENT, txn=txn) self._packmark.put(oid, PRESENT, txn=txn)
...@@ -467,6 +465,8 @@ class Minimal(BerkeleyBase, ConflictResolvingStorage): ...@@ -467,6 +465,8 @@ class Minimal(BerkeleyBase, ConflictResolvingStorage):
try: try:
rec = c.first() rec = c.first()
while rec: while rec:
if self._stop:
raise PackStop, 'stopped in _sweep()'
oid = rec[0] oid = rec[0]
rec = c.next() rec = c.next()
# If packmark (which knows about all the root reachable # If packmark (which knows about all the root reachable
...@@ -482,6 +482,8 @@ class Minimal(BerkeleyBase, ConflictResolvingStorage): ...@@ -482,6 +482,8 @@ class Minimal(BerkeleyBase, ConflictResolvingStorage):
def _collect_objs(self, txn): def _collect_objs(self, txn):
orec = self._oidqueue.consume() orec = self._oidqueue.consume()
while orec: while orec:
if self._stop:
raise PackStop, 'stopped in _collect_objs()'
oid = orec[1] oid = orec[1]
serial = self._getCurrentSerial(oid) serial = self._getCurrentSerial(oid)
# Delete the object from the serials table # Delete the object from the serials table
...@@ -492,6 +494,8 @@ class Minimal(BerkeleyBase, ConflictResolvingStorage): ...@@ -492,6 +494,8 @@ class Minimal(BerkeleyBase, ConflictResolvingStorage):
except db.DBNotFoundError: except db.DBNotFoundError:
rec = None rec = None
while rec and rec[0] == oid: while rec and rec[0] == oid:
if self._stop:
raise PackStop, 'stopped in _collect_objs() loop 1'
c.delete() c.delete()
rec = c.next_dup() rec = c.next_dup()
# We don't need the refcounts any more, but note that if the # We don't need the refcounts any more, but note that if the
...@@ -511,6 +515,8 @@ class Minimal(BerkeleyBase, ConflictResolvingStorage): ...@@ -511,6 +515,8 @@ class Minimal(BerkeleyBase, ConflictResolvingStorage):
except db.DBNotFoundError: except db.DBNotFoundError:
rec = None rec = None
while rec and rec[0][:8] == oid: while rec and rec[0][:8] == oid:
if self._stop:
raise PackStop, 'stopped in _collect_objs() loop 2'
data = rec[1] data = rec[1]
c.delete() c.delete()
rec = c.next() rec = c.next()
...@@ -555,28 +561,10 @@ class Minimal(BerkeleyBase, ConflictResolvingStorage): ...@@ -555,28 +561,10 @@ class Minimal(BerkeleyBase, ConflictResolvingStorage):
class _Autopack(threading.Thread): class _Autopack(_WorkThread):
def __init__(self, storage, frequency): def __init__(self, storage, frequency):
threading.Thread.__init__(self) _WorkThread.__init__(self, storage, frequency, 'autopacking')
self._storage = storage
self._frequency = frequency def _dowork(self, now):
# Bookkeeping # Run the autopack phase
self._stop = False self._storage.pack('ignored', referencesf)
self._nextpack = 0
def run(self):
zLOG.LOG('Minimal storage', zLOG.INFO, 'autopack thread started')
while not self._stop:
now = time.time()
if now > self._nextpack:
# Run the autopack phase
self._storage.pack('ignored', referencesf)
self._nextpack = now + self._frequency
# Now we sleep for a little while before we check again. Sleep
# for the minimum of self._frequency and AUTOPACK_CHECK_SLEEPso as
# to be as responsive as ossible to .stop() calls.
time.sleep(min(self._frequency, AUTOPACK_CHECK_SLEEP))
zLOG.LOG('Minimal storage', zLOG.INFO, 'autopack thread finished')
def stop(self):
self._stop = True
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
"""Berkeley storage without undo or versioning. """Berkeley storage without undo or versioning.
""" """
__version__ = '$Revision: 1.15 $'[-2:][0] __version__ = '$Revision: 1.16 $'[-2:][0]
import time import time
import threading import threading
...@@ -25,16 +25,17 @@ import threading ...@@ -25,16 +25,17 @@ import threading
# PyBSDDB3. # PyBSDDB3.
from bsddb3 import db from bsddb3 import db
# BerkeleyBase class provides some common functionality for BerkeleyDB-based
# storages. It in turn inherits from BaseStorage which itself provides some
# common storage functionality.
from BerkeleyBase import BerkeleyBase
from ZODB import POSException from ZODB import POSException
from ZODB.utils import U64, p64 from ZODB.utils import U64, p64
from ZODB.referencesf import referencesf from ZODB.referencesf import referencesf
from ZODB.ConflictResolution import ConflictResolvingStorage, ResolvedSerial from ZODB.ConflictResolution import ConflictResolvingStorage, ResolvedSerial
import zLOG import zLOG
# BerkeleyBase class provides some common functionality for BerkeleyDB-based
# storages. It in turn inherits from BaseStorage which itself provides some
# common storage functionality.
from BerkeleyBase import BerkeleyBase, PackStop, _WorkThread
ABORT = 'A' ABORT = 'A'
COMMIT = 'C' COMMIT = 'C'
PRESENT = 'X' PRESENT = 'X'
...@@ -112,11 +113,7 @@ class Minimal(BerkeleyBase, ConflictResolvingStorage): ...@@ -112,11 +113,7 @@ class Minimal(BerkeleyBase, ConflictResolvingStorage):
self._pending = self._setupDB('pending') self._pending = self._setupDB('pending')
# Tables to support packing. # Tables to support packing.
self._packmark = self._setupDB('packmark') self._packmark = self._setupDB('packmark')
self._oidqueue = db.DB(self._env) self._oidqueue = self._setupDB('oidqueue', 0, db.DB_QUEUE, 8)
self._oidqueue.set_re_len(8)
# BAW: do we need to set the queue extent size?
self._oidqueue.open(self._prefix + 'oidqueue',
db.DB_QUEUE, db.DB_CREATE)
# Do recovery and consistency checks # Do recovery and consistency checks
pendings = self._pending.keys() pendings = self._pending.keys()
assert len(pendings) <= 1 assert len(pendings) <= 1
...@@ -142,24 +139,23 @@ class Minimal(BerkeleyBase, ConflictResolvingStorage): ...@@ -142,24 +139,23 @@ class Minimal(BerkeleyBase, ConflictResolvingStorage):
self._autopacker.start() self._autopacker.start()
def close(self): def close(self):
# We must stop the autopacker first before closing any tables. BAW: # Set this flag before acquiring the lock so we don't block waiting
# should we use a timeout on the join() call? I'm not sure. On the # for the autopack thread to give up the lock.
# one hand we don't want to block forever, but on the other, killing self._stop = True
# the autopacker thread in the middle of real work could leave the self._lock_acquire()
# databases in a corrupted state, requiring recovery. With a try:
# AUTOPACK_CHECK_SLEEP low enough, we shouldn't be blocking for long. # We must stop the autopacker and checkpointing threads first
if self._autopacker: # before closing any tables. I'm not sure about the join()
zLOG.LOG('Minimal storage', zLOG.INFO, 'stopping autopack thread') # timeout, but I'd be surprised if any particular iteration of a
self._autopacker.stop() # pack-related loops take longer than a few seconds.
self._autopacker.join() if self._autopacker:
self._serials.close() zLOG.LOG('Minimal storage', zLOG.INFO,
self._pickles.close() 'stopping autopack thread')
self._refcounts.close() self._autopacker.stop()
self._oids.close() self._autopacker.join(30)
self._pending.close() BerkeleyBase.close(self)
self._packmark.close() finally:
self._oidqueue.close() self._lock_release()
BerkeleyBase.close(self)
def _doabort(self, txn, tid): def _doabort(self, txn, tid):
co = cs = None co = cs = None
...@@ -440,6 +436,8 @@ class Minimal(BerkeleyBase, ConflictResolvingStorage): ...@@ -440,6 +436,8 @@ class Minimal(BerkeleyBase, ConflictResolvingStorage):
# root references, and then for each of those, find all the objects it # root references, and then for each of those, find all the objects it
# references, and so on until we've traversed the entire object graph. # references, and so on until we've traversed the entire object graph.
while oid: while oid:
if self._stop:
raise PackStop, 'stopped in _mark()'
if not self._packmark.has_key(oid): if not self._packmark.has_key(oid):
# We've haven't yet seen this object # We've haven't yet seen this object
self._packmark.put(oid, PRESENT, txn=txn) self._packmark.put(oid, PRESENT, txn=txn)
...@@ -467,6 +465,8 @@ class Minimal(BerkeleyBase, ConflictResolvingStorage): ...@@ -467,6 +465,8 @@ class Minimal(BerkeleyBase, ConflictResolvingStorage):
try: try:
rec = c.first() rec = c.first()
while rec: while rec:
if self._stop:
raise PackStop, 'stopped in _sweep()'
oid = rec[0] oid = rec[0]
rec = c.next() rec = c.next()
# If packmark (which knows about all the root reachable # If packmark (which knows about all the root reachable
...@@ -482,6 +482,8 @@ class Minimal(BerkeleyBase, ConflictResolvingStorage): ...@@ -482,6 +482,8 @@ class Minimal(BerkeleyBase, ConflictResolvingStorage):
def _collect_objs(self, txn): def _collect_objs(self, txn):
orec = self._oidqueue.consume() orec = self._oidqueue.consume()
while orec: while orec:
if self._stop:
raise PackStop, 'stopped in _collect_objs()'
oid = orec[1] oid = orec[1]
serial = self._getCurrentSerial(oid) serial = self._getCurrentSerial(oid)
# Delete the object from the serials table # Delete the object from the serials table
...@@ -492,6 +494,8 @@ class Minimal(BerkeleyBase, ConflictResolvingStorage): ...@@ -492,6 +494,8 @@ class Minimal(BerkeleyBase, ConflictResolvingStorage):
except db.DBNotFoundError: except db.DBNotFoundError:
rec = None rec = None
while rec and rec[0] == oid: while rec and rec[0] == oid:
if self._stop:
raise PackStop, 'stopped in _collect_objs() loop 1'
c.delete() c.delete()
rec = c.next_dup() rec = c.next_dup()
# We don't need the refcounts any more, but note that if the # We don't need the refcounts any more, but note that if the
...@@ -511,6 +515,8 @@ class Minimal(BerkeleyBase, ConflictResolvingStorage): ...@@ -511,6 +515,8 @@ class Minimal(BerkeleyBase, ConflictResolvingStorage):
except db.DBNotFoundError: except db.DBNotFoundError:
rec = None rec = None
while rec and rec[0][:8] == oid: while rec and rec[0][:8] == oid:
if self._stop:
raise PackStop, 'stopped in _collect_objs() loop 2'
data = rec[1] data = rec[1]
c.delete() c.delete()
rec = c.next() rec = c.next()
...@@ -555,28 +561,10 @@ class Minimal(BerkeleyBase, ConflictResolvingStorage): ...@@ -555,28 +561,10 @@ class Minimal(BerkeleyBase, ConflictResolvingStorage):
class _Autopack(threading.Thread): class _Autopack(_WorkThread):
def __init__(self, storage, frequency): def __init__(self, storage, frequency):
threading.Thread.__init__(self) _WorkThread.__init__(self, storage, frequency, 'autopacking')
self._storage = storage
self._frequency = frequency def _dowork(self, now):
# Bookkeeping # Run the autopack phase
self._stop = False self._storage.pack('ignored', referencesf)
self._nextpack = 0
def run(self):
zLOG.LOG('Minimal storage', zLOG.INFO, 'autopack thread started')
while not self._stop:
now = time.time()
if now > self._nextpack:
# Run the autopack phase
self._storage.pack('ignored', referencesf)
self._nextpack = now + self._frequency
# Now we sleep for a little while before we check again. Sleep
# for the minimum of self._frequency and AUTOPACK_CHECK_SLEEPso as
# to be as responsive as ossible to .stop() calls.
time.sleep(min(self._frequency, AUTOPACK_CHECK_SLEEP))
zLOG.LOG('Minimal storage', zLOG.INFO, 'autopack thread finished')
def stop(self):
self._stop = True
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment