Commit ca8e8bf1 authored by Barry Warsaw's avatar Barry Warsaw

Several improvements to pack(), which should be good enough for the

1.0 release.  These changes pass all the tests, but the proof will be
packing some real data (to come next -- this stuff has to be checked
in first).  Specifically,

__init__(): We now allocate a pack-lock which is acquired in pack() to
prevent multiple threads from packing at the same time.  This should
be deadlock-proof because you should never have the storage lock when
you want to acquire the pack lock (but I could be missing something
obvious).

_rootreachable(): New method which calculates the set of object ids
reachable from the current revision of the root object.

_zapobject(): Rewritten to remove the recursion.  It populates a
dictionary (passed in the parameter) with the oids for any objects
whose refcounts went to zero because of the object we're now zapping.

_zaprevision(): Same deal as _zapobjects().

_dopack(): The guts of the pack() operation, done this way so pack()
itself can just be a wrapper around _dopack() with pack lock
acquisition/release.  This also does not acquire the storage lock
until it actually has to start zapping revisions, and it releases and
reacquires the lock for each revision it zaps.  This gives other
threads an opportunity to do work during a pack, which should never
impact the pack as any work they do will happen in the future (and
we're safe against bogus future pack times).

pack(): Acquire the pack lock, _dopack(), release the pack lock.
parent b4fa6640
...@@ -4,7 +4,7 @@ See Minimal.py for an implementation of Berkeley storage that does not support ...@@ -4,7 +4,7 @@ See Minimal.py for an implementation of Berkeley storage that does not support
undo or versioning. undo or versioning.
""" """
__version__ = '$Revision: 1.33 $'.split()[-2:][0] __version__ = '$Revision: 1.34 $'.split()[-2:][0]
import sys import sys
import struct import struct
...@@ -19,6 +19,7 @@ from ZODB.utils import p64, U64 ...@@ -19,6 +19,7 @@ from ZODB.utils import p64, U64
from ZODB.referencesf import referencesf from ZODB.referencesf import referencesf
from ZODB.TimeStamp import TimeStamp from ZODB.TimeStamp import TimeStamp
from ZODB.ConflictResolution import ConflictResolvingStorage, ResolvedSerial from ZODB.ConflictResolution import ConflictResolvingStorage, ResolvedSerial
import ThreadLock
# BerkeleyBase.BerkeleyBase class provides some common functionality for both # BerkeleyBase.BerkeleyBase class provides some common functionality for both
# the Full and Minimal implementations. It in turn inherits from # the Full and Minimal implementations. It in turn inherits from
...@@ -46,6 +47,10 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -46,6 +47,10 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
# #
# Overrides of base class methods # Overrides of base class methods
# #
def __init__(self, name, env=None, prefix='zodb_'):
self._packlock = ThreadLock.allocate_lock()
BerkeleyBase.__init__(self, name, env, prefix)
def _setupDBs(self): def _setupDBs(self):
# Data Type Assumptions: # Data Type Assumptions:
# #
...@@ -928,25 +933,52 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -928,25 +933,52 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
finally: finally:
self._lock_release() self._lock_release()
def _zapobject(self, oid, referencesf): def _rootreachable(self, referencesf):
# Quick exit for empty storages
if not self._serials:
return {}
reachables = {}
seen = {}
lookingat = {ZERO: 1}
# Start at the root, find all the objects the current revision of the
# root references, and then for each of those, find all the objects it
# references, and so on until we've traversed the entire object graph.
while lookingat:
oid, ignore = lookingat.popitem()
# Don't look at objects we've already seen
if seen.has_key(oid):
continue
seen[oid] = 1
reachables[oid] = 1
# Get the pickle data for the object's current version
revid = self._serials.get(oid)
lrevid = self._metadata[oid+revid][16:24]
pickle = self._pickles[oid+lrevid]
refdoids = []
referencesf(pickle, refdoids)
# BAW: recursion might be bad, but it's easy to implement. :(
for oid in refdoids:
lookingat[oid] = 1
return reachables
def _zapobject(self, oid, decrefoids, referencesf):
# Delete all records referenced by this object # Delete all records referenced by this object
try:
self._serials.delete(oid) self._serials.delete(oid)
except db.DBNotFoundError: # This oid should not be in the decrefoids set because it would have
pass # had to have been popitem()'d off to even get here. Let's just make
# sure of that invariant...
assert not decrefoids.has_key(oid)
# We don't need to track reference counts to this object anymore
self._refcounts.delete(oid) self._refcounts.delete(oid)
# Run through all the metadata records associated with this object, # Run through all the metadata records associated with this object,
# and recursively zap all its revisions. Keep track of the tids and # and iterate through all its revisions, zapping them. Keep track of
# vids referenced by the metadata record, so that we can clean up the # the tids and vids referenced by the metadata record, so we can clean
# txnoids and currentVersions tables. # up the txnoids and currentVersions tables too.
tids = {} tids = {}
vids = {} vids = {}
c = self._metadata.cursor() c = self._metadata.cursor()
try:
try: try:
rec = c.set_range(oid) rec = c.set_range(oid)
except db.DBNotFoundError:
return
while rec and rec[0][:8] == oid: while rec and rec[0][:8] == oid:
key, data = rec key, data = rec
rec = c.next() rec = c.next()
...@@ -955,7 +987,7 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -955,7 +987,7 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
vid = data[:8] vid = data[:8]
if vid <> ZERO: if vid <> ZERO:
vids[vid] = 1 vids[vid] = 1
self._zaprevision(key, referencesf) self._zaprevision(key, decrefoids, referencesf)
finally: finally:
c.close() c.close()
# Zap all the txnoid records... # Zap all the txnoid records...
...@@ -985,140 +1017,164 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -985,140 +1017,164 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
finally: finally:
c.close() c.close()
# ... now for each vid, delete the versions->vid and vid->versions # ... now for each vid, delete the versions->vid and vid->versions
# mapping if we've deleted all references to this vid # mapping if we've deleted all references to this vid.
for vid in vids.keys(): for vid in vids.keys():
if self._currentVersions.get(vid): if self._currentVersions.get(vid):
continue continue
version = self._versions[vid] version = self._versions.get(vid)
self._versions.delete(vid) self._versions.delete(vid)
self._vids.delete(version) self._vids.delete(version)
def _zaprevision(self, key, referencesf): def _zaprevision(self, key, decrefoids, referencesf):
# Delete the metadata record pointed to by the key, decrefing the # For each revision we're going to delete, we need to decref the
# reference counts of the pickle pointed to by this record, and # pickle this revision points at. If that pickle gets decref'd to
# perform cascading decrefs on the referenced objects. # zero, then we know we can reclaim all of those records too. But
# # zapping a pickle means we then have to decref all the objects that
# We need the lrevid which points to the pickle for this revision. # that pickle references, and so on...
try: lrevid = self._metadata.get(key)[16:24]
lrevid = self._metadata[key][16:24]
except KeyError:
return
# Decref the reference count of the pickle pointed to by oid+lrevid. # Decref the reference count of the pickle pointed to by oid+lrevid.
# If the reference count goes to zero, we can garbage collect the # If the reference count goes to zero, we can garbage collect the
# pickle, and decref all the objects pointed to by the pickle (with of # pickle, and decref all the objects pointed to by the pickle (with of
# course, cascading garbage collection). # course, cascading garbage collection).
pkey = key[:8] + lrevid pkey = key[:8] + lrevid
refcount = U64(self._pickleRefcounts[pkey]) - 1 refcount = U64(self._pickleRefcounts.get(pkey)) - 1
if refcount > 0: if refcount > 0:
# Not decref'd to zero, so just store the refcount back in
self._pickleRefcounts.put(pkey, p64(refcount)) self._pickleRefcounts.put(pkey, p64(refcount))
else: else:
# The refcount of this pickle has gone to zero, so we need to # Decref'd to zero, so decref all the objects that this pickle
# garbage collect it, and decref all the objects it points to. # refers to, and delete the pickle entries. If the objects
pickle = self._pickles[pkey] # themselves get decref'd to zero, add them to the decrefoids set
# for later collection
pickle = self._pickles.get(pkey)
# Sniff the pickle to get the objects it refers to # Sniff the pickle to get the objects it refers to
refoids = [] refoids = []
referencesf(pickle, refoids) referencesf(pickle, refoids)
# Now decref the reference counts for each of those objects. If
# the object's refcount goes to zero, remember the oid so we can
# recursively zap its metadata records too.
collectables = {}
for oid in refoids: for oid in refoids:
refcount = U64(self._refcounts[oid]) - 1 refcount = U64(self._refcounts.get(oid)) - 1
if refcount > 0: if refcount > 0:
self._refcounts.put(oid, p64(refcount)) self._refcounts.put(oid, p64(refcount))
else: else:
collectables[oid] = 1 decrefoids[oid] = 1
# Garbage collect all objects with refcounts that just went to # We're done with the pickleRefcounts and pickle entries for this
# zero. # garbage collected pickle data.
for oid in collectables.keys():
self._zapobject(oid, referencesf)
# We can now delete both the pickleRefcounts and pickle entry for
# this garbage collected pickle.
self._pickles.delete(pkey) self._pickles.delete(pkey)
self._pickleRefcounts.delete(pkey) self._pickleRefcounts.delete(pkey)
# We can now delete this metadata record. # Now we're done with this metadata record
self._metadata.delete(key) self._metadata.delete(key)
# We'll erase the knowledge that this tid touched this object
def pack(self, t, referencesf): tid = key[8:]
# BAW: This doesn't play nicely if you enable the `debugging revids'
#
# t is a TimeTime, or time float, convert this to a TimeStamp object,
# using an algorithm similar to what's used in FileStorage. The
# TimeStamp can then be used as a key in the txnMetadata table, since
# we know our revision id's are actually TimeStamps.
t0 = TimeStamp(*(time.gmtime(t)[:5] + (t%60,)))
self._lock_acquire()
c = None
tidmarks = {}
oids = {}
try:
# Figure out when to pack to. We happen to know that our
# transaction ids are really timestamps.
c = self._txnoids.cursor() c = self._txnoids.cursor()
# Need to use the repr of the TimeStamp so we get a string
try: try:
rec = c.set_range(`t0`) # Yes, the key here is the tid, which is the second 8 bytes, while
except db.DBNotFoundError: # the value is the oid, which is the first 8 bytes.
rec = c.last() c.set_both(tid, key[:8])
while rec: c.delete()
tid, oid = rec finally:
rec = c.prev()
# We need to mark this transaction as having participated in a
# pack, so that undo will not create a temporal anomaly.
if not tidmarks.has_key(tid):
meta = self._txnMetadata[tid]
# Has this transaction already been packed? If so, we can
# stop here... I think!
if meta[0] == PROTECTED_TRANSACTION:
break
self._txnMetadata[tid] = PROTECTED_TRANSACTION + meta[1:]
tidmarks[tid] = 1
# For now, just remember which objects are touched by the
# packable
if oid <> ZERO:
oids[oid] = 1
# Now look at every object revision metadata record for the
# objects that have been touched in the packable transactions. If
# the metadata record points at the current revision of the
# object, ignore it, otherwise reclaim it.
c.close() c.close()
# Now, we've either done one of two things: 1) we've removed the last
# record of an oid being modified in this transaction, in which case
# we can garbage collec the txnMetadata because there aren't any
# object revisions that reference this transaction. OR, 2) there are
# still uncollected objects in this transaction, but we must mark this
# transaction as packed() so that undo can't possibly introduce a
# temporal anomoly.
if self._txnoids.has_key(tid):
meta = self._txnMetadata.get(tid)[1:]
self._txnMetadata.put(tid, PROTECTED_TRANSACTION + meta)
else:
self._txnMetadata.delete(tid)
def _dopack(self, t, referencesf):
# t is a TimeTime, or time float, convert this to a TimeStamp object,
# using an algorithm similar to what's used in FileStorage. We know
# that our transaction ids, a.k.a. revision ids, are timestamps. BAW:
# This doesn't play nicely if you enable the `debugging revids'
#
# BAW: should a pack time in the future be a ValueError? We'd have to
# worry about clock skew, so for now, we just set the pack time to the
# minimum of t and now.
packtime = min(t, time.time())
t0 = TimeStamp(*(time.gmtime(packtime)[:5] + (packtime % 60,)))
packtid = `t0`
# Calculate the set of objects reachable from the root. Anything else
# is a candidate for having all their revisions packed away.
reachables = self._rootreachable(referencesf)
# We now cruise through all the objects we know about, i.e. the keys
# of the serials table, looking at all the object revisions earlier
# than the pack time. If the revision is not the current revision,
# then it's a packable revision. We employ a BDB trick of set_range()
# to give us the smallest record greater than or equal to the one we
# ask for. We move to the one just before that, and cruise backwards.
#
# This should also make us immune to evil future-pack time values,
# although it would still be better to raise a ValueError in those
# situations. This is a dictionary keyed off the object id, with
# values which are a list of revisions (oid+tid) that can be packed.
packablerevs = {}
c = self._metadata.cursor() c = self._metadata.cursor()
for oid in oids.keys(): # BAW: can two threads be packing at the same time? If so, we need to
# handle that. If not, we should enforce that with a pack-lock.
for oid in self._serials.keys():
try: try:
current = self._serials[oid] rec = c.set_range(oid+packtid)
except KeyError: # The one just before this should be the largest record less
continue # than or equal to the key, i.e. the object revision just
try: # before the given pack time.
rec = c.set_range(oid) rec = c.prev()
except db.DBNotFoundError: except db.DBNotFoundError:
continue # Perhaps the last record in the database is the last one
# containing this oid?
rec = c.last()
# Now move backwards in time to look at all the revisions of this
# object. All but the current one are packable, unless the object
# isn't reachable from the root, in which case, all its revisions
# are packable.
while rec: while rec:
key, data = rec key, data = rec
rec = c.next() rec = c.prev()
if key[:8] <> oid: # Make sure we're still looking at revisions for this object
if oid <> key[:8]:
break break
if key[8:] == current: if not reachables.has_key(oid):
continue packablerevs.setdefault(oid, []).append(key)
self._zaprevision(key, referencesf) # Otherwise, if this isn't the current revision for this
# Now look and see if the object has a reference count of # object, then it's packable.
# zero, and if so garbage collect it. refcounts will be None elif self._serials[oid] <> key[8:]:
# if the reference count of this object is zero, i.e. it won't packablerevs.setdefault(oid, []).append(key)
# be in the table. # We now have all the packable revisions we're going to handle. For
refcounts = self._refcounts.get(oid) # each object with revisions that we're going to pack away, acquire
if not refcounts: # the storage lock so we can do that without fear of trampling by
# The current revision should be the only revision of this # other threads (i.e. interaction of transactionalUndo() and pack()).
# object that exists, otherwise its refcounts shouldn't be #
# zero. # This set contains the oids of all objects that have been decref'd
self._zaprevision(oid+current, referencesf) # to zero by the pack operation. To avoid recursion, we'll just note
# And delete a few other records that _zaprevisions() # them now and handle them in a loop later.
# doesn't clean up #
self._serials.delete(oid) # BAW: should packs be transaction protected?
if refcounts is not None: decrefoids = {}
self._refcounts.delete(oid) for oid in packablerevs.keys():
self._lock_acquire()
try:
for key in packablerevs[oid]:
self._zaprevision(key, decrefoids, referencesf)
finally: finally:
if c:
c.close()
self._lock_release() self._lock_release()
# While there are still objects to collect, continue to do so.
# Note that collecting an object may reveal more objects that are
# dec refcounted to zero.
while decrefoids:
oid, ignore = decrefoids.popitem()
self._zapobject(oid, decrefoids, referencesf)
def pack(self, t, referencesf):
# A simple wrapper around the bulk of packing, but which acquires a
# lock that prevents multiple packs from running at the same time.
self._packlock.acquire()
try:
self._dopack(t, referencesf)
finally:
self._packlock.release()
# GCable interface, for cyclic garbage collection # GCable interface, for cyclic garbage collection
# #
......
...@@ -4,7 +4,7 @@ See Minimal.py for an implementation of Berkeley storage that does not support ...@@ -4,7 +4,7 @@ See Minimal.py for an implementation of Berkeley storage that does not support
undo or versioning. undo or versioning.
""" """
__version__ = '$Revision: 1.33 $'.split()[-2:][0] __version__ = '$Revision: 1.34 $'.split()[-2:][0]
import sys import sys
import struct import struct
...@@ -19,6 +19,7 @@ from ZODB.utils import p64, U64 ...@@ -19,6 +19,7 @@ from ZODB.utils import p64, U64
from ZODB.referencesf import referencesf from ZODB.referencesf import referencesf
from ZODB.TimeStamp import TimeStamp from ZODB.TimeStamp import TimeStamp
from ZODB.ConflictResolution import ConflictResolvingStorage, ResolvedSerial from ZODB.ConflictResolution import ConflictResolvingStorage, ResolvedSerial
import ThreadLock
# BerkeleyBase.BerkeleyBase class provides some common functionality for both # BerkeleyBase.BerkeleyBase class provides some common functionality for both
# the Full and Minimal implementations. It in turn inherits from # the Full and Minimal implementations. It in turn inherits from
...@@ -46,6 +47,10 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -46,6 +47,10 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
# #
# Overrides of base class methods # Overrides of base class methods
# #
def __init__(self, name, env=None, prefix='zodb_'):
self._packlock = ThreadLock.allocate_lock()
BerkeleyBase.__init__(self, name, env, prefix)
def _setupDBs(self): def _setupDBs(self):
# Data Type Assumptions: # Data Type Assumptions:
# #
...@@ -928,25 +933,52 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -928,25 +933,52 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
finally: finally:
self._lock_release() self._lock_release()
def _zapobject(self, oid, referencesf): def _rootreachable(self, referencesf):
# Quick exit for empty storages
if not self._serials:
return {}
reachables = {}
seen = {}
lookingat = {ZERO: 1}
# Start at the root, find all the objects the current revision of the
# root references, and then for each of those, find all the objects it
# references, and so on until we've traversed the entire object graph.
while lookingat:
oid, ignore = lookingat.popitem()
# Don't look at objects we've already seen
if seen.has_key(oid):
continue
seen[oid] = 1
reachables[oid] = 1
# Get the pickle data for the object's current version
revid = self._serials.get(oid)
lrevid = self._metadata[oid+revid][16:24]
pickle = self._pickles[oid+lrevid]
refdoids = []
referencesf(pickle, refdoids)
# BAW: recursion might be bad, but it's easy to implement. :(
for oid in refdoids:
lookingat[oid] = 1
return reachables
def _zapobject(self, oid, decrefoids, referencesf):
# Delete all records referenced by this object # Delete all records referenced by this object
try:
self._serials.delete(oid) self._serials.delete(oid)
except db.DBNotFoundError: # This oid should not be in the decrefoids set because it would have
pass # had to have been popitem()'d off to even get here. Let's just make
# sure of that invariant...
assert not decrefoids.has_key(oid)
# We don't need to track reference counts to this object anymore
self._refcounts.delete(oid) self._refcounts.delete(oid)
# Run through all the metadata records associated with this object, # Run through all the metadata records associated with this object,
# and recursively zap all its revisions. Keep track of the tids and # and iterate through all its revisions, zapping them. Keep track of
# vids referenced by the metadata record, so that we can clean up the # the tids and vids referenced by the metadata record, so we can clean
# txnoids and currentVersions tables. # up the txnoids and currentVersions tables too.
tids = {} tids = {}
vids = {} vids = {}
c = self._metadata.cursor() c = self._metadata.cursor()
try:
try: try:
rec = c.set_range(oid) rec = c.set_range(oid)
except db.DBNotFoundError:
return
while rec and rec[0][:8] == oid: while rec and rec[0][:8] == oid:
key, data = rec key, data = rec
rec = c.next() rec = c.next()
...@@ -955,7 +987,7 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -955,7 +987,7 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
vid = data[:8] vid = data[:8]
if vid <> ZERO: if vid <> ZERO:
vids[vid] = 1 vids[vid] = 1
self._zaprevision(key, referencesf) self._zaprevision(key, decrefoids, referencesf)
finally: finally:
c.close() c.close()
# Zap all the txnoid records... # Zap all the txnoid records...
...@@ -985,140 +1017,164 @@ class Full(BerkeleyBase, ConflictResolvingStorage): ...@@ -985,140 +1017,164 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
finally: finally:
c.close() c.close()
# ... now for each vid, delete the versions->vid and vid->versions # ... now for each vid, delete the versions->vid and vid->versions
# mapping if we've deleted all references to this vid # mapping if we've deleted all references to this vid.
for vid in vids.keys(): for vid in vids.keys():
if self._currentVersions.get(vid): if self._currentVersions.get(vid):
continue continue
version = self._versions[vid] version = self._versions.get(vid)
self._versions.delete(vid) self._versions.delete(vid)
self._vids.delete(version) self._vids.delete(version)
def _zaprevision(self, key, referencesf): def _zaprevision(self, key, decrefoids, referencesf):
# Delete the metadata record pointed to by the key, decrefing the # For each revision we're going to delete, we need to decref the
# reference counts of the pickle pointed to by this record, and # pickle this revision points at. If that pickle gets decref'd to
# perform cascading decrefs on the referenced objects. # zero, then we know we can reclaim all of those records too. But
# # zapping a pickle means we then have to decref all the objects that
# We need the lrevid which points to the pickle for this revision. # that pickle references, and so on...
try: lrevid = self._metadata.get(key)[16:24]
lrevid = self._metadata[key][16:24]
except KeyError:
return
# Decref the reference count of the pickle pointed to by oid+lrevid. # Decref the reference count of the pickle pointed to by oid+lrevid.
# If the reference count goes to zero, we can garbage collect the # If the reference count goes to zero, we can garbage collect the
# pickle, and decref all the objects pointed to by the pickle (with of # pickle, and decref all the objects pointed to by the pickle (with of
# course, cascading garbage collection). # course, cascading garbage collection).
pkey = key[:8] + lrevid pkey = key[:8] + lrevid
refcount = U64(self._pickleRefcounts[pkey]) - 1 refcount = U64(self._pickleRefcounts.get(pkey)) - 1
if refcount > 0: if refcount > 0:
# Not decref'd to zero, so just store the refcount back in
self._pickleRefcounts.put(pkey, p64(refcount)) self._pickleRefcounts.put(pkey, p64(refcount))
else: else:
# The refcount of this pickle has gone to zero, so we need to # Decref'd to zero, so decref all the objects that this pickle
# garbage collect it, and decref all the objects it points to. # refers to, and delete the pickle entries. If the objects
pickle = self._pickles[pkey] # themselves get decref'd to zero, add them to the decrefoids set
# for later collection
pickle = self._pickles.get(pkey)
# Sniff the pickle to get the objects it refers to # Sniff the pickle to get the objects it refers to
refoids = [] refoids = []
referencesf(pickle, refoids) referencesf(pickle, refoids)
# Now decref the reference counts for each of those objects. If
# the object's refcount goes to zero, remember the oid so we can
# recursively zap its metadata records too.
collectables = {}
for oid in refoids: for oid in refoids:
refcount = U64(self._refcounts[oid]) - 1 refcount = U64(self._refcounts.get(oid)) - 1
if refcount > 0: if refcount > 0:
self._refcounts.put(oid, p64(refcount)) self._refcounts.put(oid, p64(refcount))
else: else:
collectables[oid] = 1 decrefoids[oid] = 1
# Garbage collect all objects with refcounts that just went to # We're done with the pickleRefcounts and pickle entries for this
# zero. # garbage collected pickle data.
for oid in collectables.keys():
self._zapobject(oid, referencesf)
# We can now delete both the pickleRefcounts and pickle entry for
# this garbage collected pickle.
self._pickles.delete(pkey) self._pickles.delete(pkey)
self._pickleRefcounts.delete(pkey) self._pickleRefcounts.delete(pkey)
# We can now delete this metadata record. # Now we're done with this metadata record
self._metadata.delete(key) self._metadata.delete(key)
# We'll erase the knowledge that this tid touched this object
def pack(self, t, referencesf): tid = key[8:]
# BAW: This doesn't play nicely if you enable the `debugging revids'
#
# t is a TimeTime, or time float, convert this to a TimeStamp object,
# using an algorithm similar to what's used in FileStorage. The
# TimeStamp can then be used as a key in the txnMetadata table, since
# we know our revision id's are actually TimeStamps.
t0 = TimeStamp(*(time.gmtime(t)[:5] + (t%60,)))
self._lock_acquire()
c = None
tidmarks = {}
oids = {}
try:
# Figure out when to pack to. We happen to know that our
# transaction ids are really timestamps.
c = self._txnoids.cursor() c = self._txnoids.cursor()
# Need to use the repr of the TimeStamp so we get a string
try: try:
rec = c.set_range(`t0`) # Yes, the key here is the tid, which is the second 8 bytes, while
except db.DBNotFoundError: # the value is the oid, which is the first 8 bytes.
rec = c.last() c.set_both(tid, key[:8])
while rec: c.delete()
tid, oid = rec finally:
rec = c.prev()
# We need to mark this transaction as having participated in a
# pack, so that undo will not create a temporal anomaly.
if not tidmarks.has_key(tid):
meta = self._txnMetadata[tid]
# Has this transaction already been packed? If so, we can
# stop here... I think!
if meta[0] == PROTECTED_TRANSACTION:
break
self._txnMetadata[tid] = PROTECTED_TRANSACTION + meta[1:]
tidmarks[tid] = 1
# For now, just remember which objects are touched by the
# packable
if oid <> ZERO:
oids[oid] = 1
# Now look at every object revision metadata record for the
# objects that have been touched in the packable transactions. If
# the metadata record points at the current revision of the
# object, ignore it, otherwise reclaim it.
c.close() c.close()
# Now, we've either done one of two things: 1) we've removed the last
# record of an oid being modified in this transaction, in which case
# we can garbage collec the txnMetadata because there aren't any
# object revisions that reference this transaction. OR, 2) there are
# still uncollected objects in this transaction, but we must mark this
# transaction as packed() so that undo can't possibly introduce a
# temporal anomoly.
if self._txnoids.has_key(tid):
meta = self._txnMetadata.get(tid)[1:]
self._txnMetadata.put(tid, PROTECTED_TRANSACTION + meta)
else:
self._txnMetadata.delete(tid)
def _dopack(self, t, referencesf):
# t is a TimeTime, or time float, convert this to a TimeStamp object,
# using an algorithm similar to what's used in FileStorage. We know
# that our transaction ids, a.k.a. revision ids, are timestamps. BAW:
# This doesn't play nicely if you enable the `debugging revids'
#
# BAW: should a pack time in the future be a ValueError? We'd have to
# worry about clock skew, so for now, we just set the pack time to the
# minimum of t and now.
packtime = min(t, time.time())
t0 = TimeStamp(*(time.gmtime(packtime)[:5] + (packtime % 60,)))
packtid = `t0`
# Calculate the set of objects reachable from the root. Anything else
# is a candidate for having all their revisions packed away.
reachables = self._rootreachable(referencesf)
# We now cruise through all the objects we know about, i.e. the keys
# of the serials table, looking at all the object revisions earlier
# than the pack time. If the revision is not the current revision,
# then it's a packable revision. We employ a BDB trick of set_range()
# to give us the smallest record greater than or equal to the one we
# ask for. We move to the one just before that, and cruise backwards.
#
# This should also make us immune to evil future-pack time values,
# although it would still be better to raise a ValueError in those
# situations. This is a dictionary keyed off the object id, with
# values which are a list of revisions (oid+tid) that can be packed.
packablerevs = {}
c = self._metadata.cursor() c = self._metadata.cursor()
for oid in oids.keys(): # BAW: can two threads be packing at the same time? If so, we need to
# handle that. If not, we should enforce that with a pack-lock.
for oid in self._serials.keys():
try: try:
current = self._serials[oid] rec = c.set_range(oid+packtid)
except KeyError: # The one just before this should be the largest record less
continue # than or equal to the key, i.e. the object revision just
try: # before the given pack time.
rec = c.set_range(oid) rec = c.prev()
except db.DBNotFoundError: except db.DBNotFoundError:
continue # Perhaps the last record in the database is the last one
# containing this oid?
rec = c.last()
# Now move backwards in time to look at all the revisions of this
# object. All but the current one are packable, unless the object
# isn't reachable from the root, in which case, all its revisions
# are packable.
while rec: while rec:
key, data = rec key, data = rec
rec = c.next() rec = c.prev()
if key[:8] <> oid: # Make sure we're still looking at revisions for this object
if oid <> key[:8]:
break break
if key[8:] == current: if not reachables.has_key(oid):
continue packablerevs.setdefault(oid, []).append(key)
self._zaprevision(key, referencesf) # Otherwise, if this isn't the current revision for this
# Now look and see if the object has a reference count of # object, then it's packable.
# zero, and if so garbage collect it. refcounts will be None elif self._serials[oid] <> key[8:]:
# if the reference count of this object is zero, i.e. it won't packablerevs.setdefault(oid, []).append(key)
# be in the table. # We now have all the packable revisions we're going to handle. For
refcounts = self._refcounts.get(oid) # each object with revisions that we're going to pack away, acquire
if not refcounts: # the storage lock so we can do that without fear of trampling by
# The current revision should be the only revision of this # other threads (i.e. interaction of transactionalUndo() and pack()).
# object that exists, otherwise its refcounts shouldn't be #
# zero. # This set contains the oids of all objects that have been decref'd
self._zaprevision(oid+current, referencesf) # to zero by the pack operation. To avoid recursion, we'll just note
# And delete a few other records that _zaprevisions() # them now and handle them in a loop later.
# doesn't clean up #
self._serials.delete(oid) # BAW: should packs be transaction protected?
if refcounts is not None: decrefoids = {}
self._refcounts.delete(oid) for oid in packablerevs.keys():
self._lock_acquire()
try:
for key in packablerevs[oid]:
self._zaprevision(key, decrefoids, referencesf)
finally: finally:
if c:
c.close()
self._lock_release() self._lock_release()
# While there are still objects to collect, continue to do so.
# Note that collecting an object may reveal more objects that are
# dec refcounted to zero.
while decrefoids:
oid, ignore = decrefoids.popitem()
self._zapobject(oid, decrefoids, referencesf)
def pack(self, t, referencesf):
# A simple wrapper around the bulk of packing, but which acquires a
# lock that prevents multiple packs from running at the same time.
self._packlock.acquire()
try:
self._dopack(t, referencesf)
finally:
self._packlock.release()
# GCable interface, for cyclic garbage collection # GCable interface, for cyclic garbage collection
# #
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment