Commit 8dc273e7 authored by Barry Warsaw's avatar Barry Warsaw

checkRootUnreachable(), checkCycleUnreachable(): Distinguish tests

between objects which are unreachable from the root -- and thus might
be collected by autopack -- and cycles which are unreachable from the
root -- which can only be collected by a full pack.
parent ba2387f8
......@@ -16,11 +16,21 @@ import os
import time
import unittest
from ZODB import DB
from ZODB.referencesf import referencesf
from ZODB.tests.MinPO import MinPO
from Persistence import Persistent
from bsddb3Storage.Full import Full
from bsddb3Storage.Minimal import Minimal
from bsddb3Storage.BerkeleyBase import BerkeleyConfig
from bsddb3Storage.tests.BerkeleyTestBase import BerkeleyTestBase
ZERO = '\0'*8
class C(Persistent):
pass
class TestAutopackBase(BerkeleyTestBase):
......@@ -43,14 +53,15 @@ class TestAutopackBase(BerkeleyTestBase):
# Create the storage
os.mkdir(dir)
try:
return Full(dir, config=self._config())
return self.ConcreteStorage(dir, config=self._config())
except:
self._zap_dbhome(dir)
raise
class TestAutopack(TestAutopackBase):
ConcreteStorage = Full
def checkAutopack(self):
unless = self.failUnless
raises = self.assertRaises
......@@ -82,6 +93,8 @@ class TestAutopack(TestAutopackBase):
class TestAutomaticClassicPack(TestAutopackBase):
ConcreteStorage = Full
def _config(self):
config = BerkeleyConfig()
# Autopack every 3 seconds, 6 seconds into the past, no classic packs
......@@ -114,9 +127,128 @@ class TestAutomaticClassicPack(TestAutopackBase):
# The first two revisions should now be gone, but the third should
# still exist because it's the current revision, and we haven't done a
# classic pack.
raises(KeyError, self._storage.loadSerial, oid, revid1)
raises(KeyError, self._storage.loadSerial, oid, revid2)
raises(KeyError, self._storage.loadSerial, oid, revid3)
raises(KeyError, storage.loadSerial, oid, revid1)
raises(KeyError, storage.loadSerial, oid, revid2)
raises(KeyError, storage.loadSerial, oid, revid3)
def checkCycleUnreachable(self):
unless = self.failUnless
raises = self.assertRaises
storage = self._storage
db = DB(storage)
conn = db.open()
root = conn.root()
self._wait_for_next_autopack()
# Store an object that's reachable from the root
obj1 = C()
obj2 = C()
obj1.obj = obj2
obj2.obj = obj1
root.obj = obj1
txn = get_transaction()
txn.note('root -> obj1 <-> obj2')
txn.commit()
oid1 = obj1._p_oid
oid2 = obj2._p_oid
assert oid1 and oid2 and oid1 <> oid2
self._wait_for_next_autopack()
unless(storage.load(ZERO, ''))
unless(storage.load(oid1, ''))
unless(storage.load(oid2, ''))
# Now unlink it, which should still leave obj1 and obj2 alive
del root.obj
txn = get_transaction()
txn.note('root -X-> obj1 <-> obj2')
txn.commit()
unless(storage.load(ZERO, ''))
unless(storage.load(oid1, ''))
unless(storage.load(oid2, ''))
# Do an explicit full pack to right now to collect all the old
# revisions and the cycle.
storage.pack(time.time(), referencesf)
# And it should be packed away
unless(storage.load(ZERO, ''))
raises(KeyError, storage.load, oid1, '')
raises(KeyError, storage.load, oid2, '')
class TestMinimalPack(TestAutopackBase):
ConcreteStorage = Minimal
def _config(self):
config = BerkeleyConfig()
# Autopack every 3 seconds
config.frequency = 3
return config
def checkRootUnreachable(self):
unless = self.failUnless
raises = self.assertRaises
storage = self._storage
db = DB(storage)
conn = db.open()
root = conn.root()
self._wait_for_next_autopack()
# Store an object that's reachable from the root
obj = C()
obj.value = 999
root.obj = obj
txn = get_transaction()
txn.note('root -> obj')
txn.commit()
oid = obj._p_oid
assert oid
self._wait_for_next_autopack()
unless(storage.load(ZERO, ''))
unless(storage.load(oid, ''))
# Now unlink it
del root.obj
txn = get_transaction()
txn.note('root -X-> obj')
txn.commit()
# The object should be gone due to reference counting
unless(storage.load(ZERO, ''))
raises(KeyError, storage.load, oid, '')
def checkCycleUnreachable(self):
unless = self.failUnless
raises = self.assertRaises
storage = self._storage
db = DB(storage)
conn = db.open()
root = conn.root()
self._wait_for_next_autopack()
# Store an object that's reachable from the root
obj1 = C()
obj2 = C()
obj1.obj = obj2
obj2.obj = obj1
root.obj = obj1
txn = get_transaction()
txn.note('root -> obj1 <-> obj2')
txn.commit()
oid1 = obj1._p_oid
oid2 = obj2._p_oid
assert oid1 and oid2 and oid1 <> oid2
self._wait_for_next_autopack()
unless(storage.load(ZERO, ''))
unless(storage.load(oid1, ''))
unless(storage.load(oid2, ''))
# Now unlink it, which should still leave obj1 and obj2 alive
del root.obj
txn = get_transaction()
txn.note('root -X-> obj1 <-> obj2')
txn.commit()
unless(storage.load(ZERO, ''))
unless(storage.load(oid1, ''))
unless(storage.load(oid2, ''))
# But the next autopack should collect both obj1 and obj2
self._wait_for_next_autopack()
# And it should be packed away
unless(storage.load(ZERO, ''))
raises(KeyError, storage.load, oid1, '')
raises(KeyError, storage.load, oid2, '')
......@@ -124,6 +256,7 @@ def test_suite():
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(TestAutopack, 'check'))
suite.addTest(unittest.makeSuite(TestAutomaticClassicPack, 'check'))
suite.addTest(unittest.makeSuite(TestMinimalPack, 'check'))
return suite
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment