Commit 9509d1a6 authored by Tres Seaver's avatar Tres Seaver

Fix Py3k namnyism: 'assert_' -> 'assertTrue'.

parent 67f90d1b
......@@ -169,7 +169,7 @@ class BasicStorage:
# of this number
self._dostore(data=MinPO(22))
self._dostore(data=MinPO(23))
self.assert_(len(self._storage) in [0,2])
self.assertTrue(len(self._storage) in [0,2])
def checkGetSize(self):
self._dostore(data=MinPO(25))
......@@ -233,10 +233,10 @@ class BasicStorage:
self._storage.checkCurrentSerialInTransaction(oid, tid, t)
self._storage.tpc_vote(t)
except POSException.ReadConflictError as v:
self.assert_(v.oid) == oid
self.assert_(v.serials == (tid2, tid))
self.assertTrue(v.oid) == oid
self.assertTrue(v.serials == (tid2, tid))
else:
if 0: self.assert_(False, "No conflict error")
if 0: self.assertTrue(False, "No conflict error")
self._storage.tpc_abort(t)
......@@ -269,7 +269,7 @@ class BasicStorage:
thread.join(33)
tid3 = self._storage.load(oid)[1]
self.assert_(tid3 > self._storage.load(b'\0\0\0\0\0\0\0\xf3')[1])
self.assertTrue(tid3 > self._storage.load(b'\0\0\0\0\0\0\0\xf3')[1])
#----------------------------------------------------------------------
# non-stale competing trans after checkCurrentSerialInTransaction
......@@ -295,7 +295,8 @@ class BasicStorage:
self._storage.tpc_finish(t)
thread.join()
tid4 = self._storage.load(oid)[1]
self.assert_(tid4 > self._storage.load(b'\0\0\0\0\0\0\0\xf4')[1])
self.assertTrue(tid4 >
self._storage.load(b'\0\0\0\0\0\0\0\xf4')[1])
def check_tid_ordering_w_commit(self):
......
......@@ -92,7 +92,7 @@ class ConflictResolvingStorage:
try:
self._dostoreNP(oid, revid=revid1, data=zodb_pickle(obj))
except ConflictError as err:
self.assert_("PCounter2" in str(err))
self.assertTrue("PCounter2" in str(err))
else:
self.fail("Expected ConflictError")
......
......@@ -233,7 +233,7 @@ class IteratorDeepCompare:
storage2.loadBlob, rec1.oid, rec1.tid)
else:
fn2 = storage2.loadBlob(rec1.oid, rec1.tid)
self.assert_(fn1 != fn2)
self.assertTrue(fn1 != fn2)
eq(open(fn1, 'rb').read(), open(fn2, 'rb').read())
# Make sure there are no more records left in rec1 and rec2,
......
......@@ -27,7 +27,7 @@ class ReadOnlyStorage:
def _make_readonly(self):
self._storage.close()
self.open(read_only=True)
self.assert_(self._storage.isReadOnly())
self.assertTrue(self._storage.isReadOnly())
def checkReadMethods(self):
self._create_data()
......
......@@ -160,8 +160,8 @@ class RecoveryStorage(IteratorDeepCompare):
# transaction. Without the patch, the second assert failed
# (it claimed it couldn't find a data record for obj2) on my
# box, but other failure modes were possible.
self.assert_(self._storage._data_find(pos, obj1_oid, '') > 0)
self.assert_(self._storage._data_find(pos, obj2_oid, '') > 0)
self.assertTrue(self._storage._data_find(pos, obj1_oid, '') > 0)
self.assertTrue(self._storage._data_find(pos, obj2_oid, '') > 0)
# The offset of the next ("redo") transaction.
pos = self._storage.getSize()
......@@ -182,8 +182,8 @@ class RecoveryStorage(IteratorDeepCompare):
if is_filestorage:
# Again _data_find should find both objects in this txn, and
# again the second assert failed on my box.
self.assert_(self._storage._data_find(pos, obj1_oid, '') > 0)
self.assert_(self._storage._data_find(pos, obj2_oid, '') > 0)
self.assertTrue(self._storage._data_find(pos, obj1_oid, '') > 0)
self.assertTrue(self._storage._data_find(pos, obj2_oid, '') > 0)
# Indirectly provoke .restore(). .restore in turn indirectly
# provokes _data_find too, but not usefully for the purposes of
......
......@@ -62,7 +62,7 @@ class RevisionStorage:
assert prev < middle < cur # else the snooze() trick failed
prev = cur
t = self._storage.loadBefore(oid, p64(middle))
self.assert_(t is not None)
self.assertTrue(t is not None)
data, start, end = t
self.assertEqual(revs[i-1][0], data)
self.assertEqual(tid, end)
......@@ -131,7 +131,7 @@ class RevisionStorage:
self.assertEqual(data, t[0])
self.assertEqual(tid, t[1])
if prev_tid:
self.assert_(prev_tid < t[1])
self.assertTrue(prev_tid < t[1])
prev_tid = t[1]
if i < 3:
self.assertEqual(revs[i+1][1], t[2])
......
......@@ -230,5 +230,5 @@ class StorageTestBase(ZODB.tests.util.TestCase):
oids.extend(oid for (oid, _) in vote_result or ())
self.assertEqual(len(oids), len(expected_oids), repr(oids))
for oid in expected_oids:
self.assert_(oid in oids)
self.assertTrue(oid in oids)
return self._storage.lastTransaction()
......@@ -376,7 +376,8 @@ class TransactionalUndoStorage:
info = self._storage.undoInfo()
tid = info[1]['id']
t = Transaction()
self.assertRaises(POSException.UndoError, self._begin_undos_vote, t, tid)
self.assertRaises(POSException.UndoError,
self._begin_undos_vote, t, tid)
self._storage.tpc_abort(t)
# Now have more fun: object1 and object2 are in the same transaction,
# which we'll try to undo to, but one of them has since modified in
......@@ -412,7 +413,8 @@ class TransactionalUndoStorage:
info = self._storage.undoInfo()
tid = info[1]['id']
t = Transaction()
self.assertRaises(POSException.UndoError, self._begin_undos_vote, t, tid)
self.assertRaises(POSException.UndoError,
self._begin_undos_vote, t, tid)
self._storage.tpc_abort(t)
self._iterate()
......@@ -545,7 +547,7 @@ class TransactionalUndoStorage:
root._p_deactivate()
cn.sync()
self.assert_(listeq(root.keys(), ["key0", "key2"]))
self.assertTrue(listeq(root.keys(), ["key0", "key2"]))
L = db.undoInfo()
db.undo(L[0]["id"])
......@@ -557,14 +559,14 @@ class TransactionalUndoStorage:
root._p_deactivate()
cn.sync()
self.assert_(listeq(root.keys(), ["key0", "key1", "key2"]))
self.assertTrue(listeq(root.keys(), ["key0", "key1", "key2"]))
for t in pack_times:
self._storage.pack(t, referencesf)
root._p_deactivate()
cn.sync()
self.assert_(listeq(root.keys(), ["key0", "key1", "key2"]))
self.assertTrue(listeq(root.keys(), ["key0", "key1", "key2"]))
for i in range(3):
obj = root["key%d" % i]
self.assertEqual(obj.value, i)
......
......@@ -60,7 +60,7 @@ class Tests(unittest.TestCase):
time.sleep(0.2)
c._transferred(3, 7)
am.closedConnection(c)
self.assert_(len(am.log) <= 1)
self.assertTrue(len(am.log) <= 1)
def testSetHistoryLength(self):
am = ActivityMonitor(history_length=3600)
......@@ -73,7 +73,7 @@ class Tests(unittest.TestCase):
self.assertEqual(len(am.log), 2)
am.setHistoryLength(0.1)
self.assertEqual(am.getHistoryLength(), 0.1)
self.assert_(len(am.log) <= 1)
self.assertTrue(len(am.log) <= 1)
def testActivityAnalysis(self):
am = ActivityMonitor(history_length=3600)
......@@ -88,16 +88,16 @@ class Tests(unittest.TestCase):
div = res[n]
self.assertEqual(div['stores'], 0)
self.assertEqual(div['loads'], 0)
self.assert_(div['start'] > 0)
self.assert_(div['start'] >= lastend)
self.assert_(div['start'] < div['end'])
self.assertTrue(div['start'] > 0)
self.assertTrue(div['start'] >= lastend)
self.assertTrue(div['start'] < div['end'])
lastend = div['end']
div = res[9]
self.assertEqual(div['stores'], 9)
self.assertEqual(div['loads'], 4)
self.assert_(div['start'] > 0)
self.assert_(div['start'] >= lastend)
self.assert_(div['start'] < div['end'])
self.assertTrue(div['start'] > 0)
self.assertTrue(div['start'] >= lastend)
self.assertTrue(div['start'] < div['end'])
def test_suite():
......
......@@ -105,7 +105,7 @@ class DBMethods(CacheTestBase):
expected = ['conn_no', 'id', 'oid', 'rc', 'klass', 'state']
for dict in self.db.cacheExtremeDetail():
for k, v in dict.items():
self.assert_(k in expected)
self.assertTrue(k in expected)
# TODO: not really sure how to do a black box test of the cache.
# Should the full sweep and minimize calls always remove things?
......@@ -114,13 +114,13 @@ class DBMethods(CacheTestBase):
old_size = self.db.cacheSize()
self.db.cacheFullSweep()
new_size = self.db.cacheSize()
self.assert_(new_size < old_size, "%s < %s" % (old_size, new_size))
self.assertTrue(new_size < old_size, "%s < %s" % (old_size, new_size))
def testMinimize(self):
old_size = self.db.cacheSize()
self.db.cacheMinimize()
new_size = self.db.cacheSize()
self.assert_(new_size < old_size, "%s < %s" % (old_size, new_size))
self.assertTrue(new_size < old_size, "%s < %s" % (old_size, new_size))
def testMinimizeTerminates(self):
# This is tricky. cPickleCache had a case where it could get into
......@@ -295,11 +295,11 @@ class LRUCacheTests(CacheTestBase):
if details['klass'].endswith('PersistentMapping'):
self.assertEqual(details['state'], None)
else:
self.assert_(details['klass'].endswith('MinPO'))
self.assertTrue(details['klass'].endswith('MinPO'))
self.assertEqual(details['state'], 0)
# The cache should never hold an unreferenced ghost.
if details['state'] is None: # i.e., it's a ghost
self.assert_(details['rc'] > 0)
self.assertTrue(details['rc'] > 0)
class StubDataManager:
def setklassstate(self, object):
......
......@@ -54,12 +54,12 @@ class ConnectionDotAdd(ZODB.tests.util.TestCase):
def check_add(self):
from ZODB.POSException import InvalidObjectReference
obj = StubObject()
self.assert_(obj._p_oid is None)
self.assert_(obj._p_jar is None)
self.assertTrue(obj._p_oid is None)
self.assertTrue(obj._p_jar is None)
self.datamgr.add(obj)
self.assert_(obj._p_oid is not None)
self.assert_(obj._p_jar is self.datamgr)
self.assert_(self.datamgr.get(obj._p_oid) is obj)
self.assertTrue(obj._p_oid is not None)
self.assertTrue(obj._p_jar is self.datamgr)
self.assertTrue(self.datamgr.get(obj._p_oid) is obj)
# Only first-class persistent objects may be added.
self.assertRaises(TypeError, self.datamgr.add, object())
......@@ -82,8 +82,8 @@ class ConnectionDotAdd(ZODB.tests.util.TestCase):
self.datamgr.add(obj)
oid = obj._p_oid
self.datamgr.abort(self.transaction)
self.assert_(obj._p_oid is None)
self.assert_(obj._p_jar is None)
self.assertTrue(obj._p_oid is None)
self.assertTrue(obj._p_jar is None)
self.assertRaises(KeyError, self.datamgr.get, oid)
def checkResetOnTpcAbort(self):
......@@ -97,8 +97,8 @@ class ConnectionDotAdd(ZODB.tests.util.TestCase):
# Let's pretend something bad happens here.
# Call tpc_abort, clearing everything.
self.datamgr.tpc_abort(self.transaction)
self.assert_(obj._p_oid is None)
self.assert_(obj._p_jar is None)
self.assertTrue(obj._p_oid is None)
self.assertTrue(obj._p_jar is None)
self.assertRaises(KeyError, self.datamgr.get, oid)
def checkTpcAbortAfterCommit(self):
......@@ -109,8 +109,8 @@ class ConnectionDotAdd(ZODB.tests.util.TestCase):
self.datamgr.commit(self.transaction)
# Let's pretend something bad happened here.
self.datamgr.tpc_abort(self.transaction)
self.assert_(obj._p_oid is None)
self.assert_(obj._p_jar is None)
self.assertTrue(obj._p_oid is None)
self.assertTrue(obj._p_jar is None)
self.assertRaises(KeyError, self.datamgr.get, oid)
self.assertEquals(self.db.storage._stored, [oid])
......@@ -121,11 +121,11 @@ class ConnectionDotAdd(ZODB.tests.util.TestCase):
self.datamgr.tpc_begin(self.transaction)
self.datamgr.commit(self.transaction)
self.datamgr.tpc_finish(self.transaction)
self.assert_(obj._p_oid is oid)
self.assert_(obj._p_jar is self.datamgr)
self.assertTrue(obj._p_oid is oid)
self.assertTrue(obj._p_jar is self.datamgr)
# This next assert_ is covered by an assert in tpc_finish.
##self.assert_(not self.datamgr._added)
# This next assertTrue is covered by an assert in tpc_finish.
##self.assertTrue(not self.datamgr._added)
self.assertEquals(self.db.storage._stored, [oid])
self.assertEquals(self.db.storage._finished, [oid])
......@@ -140,11 +140,12 @@ class ConnectionDotAdd(ZODB.tests.util.TestCase):
self.datamgr.commit(self.transaction)
self.datamgr.tpc_finish(self.transaction)
storage = self.db.storage
self.assert_(obj._p_oid in storage._stored, "object was not stored")
self.assert_(subobj._p_oid in storage._stored,
self.assertTrue(obj._p_oid in storage._stored, "object was not stored")
self.assertTrue(subobj._p_oid in storage._stored,
"subobject was not stored")
self.assert_(member._p_oid in storage._stored, "member was not stored")
self.assert_(self.datamgr._added_during_commit is None)
self.assertTrue(member._p_oid in storage._stored,
"member was not stored")
self.assertTrue(self.datamgr._added_during_commit is None)
def checkUnusedAddWorks(self):
# When an object is added, but not committed, it shouldn't be stored,
......@@ -153,7 +154,7 @@ class ConnectionDotAdd(ZODB.tests.util.TestCase):
self.datamgr.add(obj)
self.datamgr.tpc_begin(self.transaction)
self.datamgr.tpc_finish(self.transaction)
self.assert_(obj._p_oid not in self.datamgr._storage._stored)
self.assertTrue(obj._p_oid not in self.datamgr._storage._stored)
def check__resetCacheResetsReader(self):
# https://bugs.launchpad.net/zodb/+bug/142667
......@@ -1043,13 +1044,13 @@ class EstimatedSizeTests(ZODB.tests.util.TestCase):
obj, cache = self.obj, self.conn._cache
# we have just written "obj". Its size should not be zero
size, cache_size = obj._p_estimated_size, cache.total_estimated_size
self.assert_(size > 0)
self.assert_(cache_size > size)
self.assertTrue(size > 0)
self.assertTrue(cache_size > size)
# increase the size, write again and check that the size changed
obj.setValueWithSize(1000)
transaction.commit()
new_size = obj._p_estimated_size
self.assert_(new_size > size)
self.assertTrue(new_size > size)
self.assertEqual(cache.total_estimated_size,
cache_size + new_size - size)
......@@ -1061,7 +1062,7 @@ class EstimatedSizeTests(ZODB.tests.util.TestCase):
obj.setValueWithSize(1000)
transaction.savepoint()
new_size = obj._p_estimated_size
self.assert_(new_size > size)
self.assertTrue(new_size > size)
self.assertEqual(cache.total_estimated_size,
cache_size + new_size - size)
......@@ -1074,7 +1075,7 @@ class EstimatedSizeTests(ZODB.tests.util.TestCase):
cache_size = cache.total_estimated_size
obj.value
size = obj._p_estimated_size
self.assert_(size > 0)
self.assertTrue(size > 0)
self.assertEqual(cache.total_estimated_size, cache_size + size)
# we test here as well that the deactivation works reduced the cache
# size
......@@ -1124,11 +1125,11 @@ class EstimatedSizeTests(ZODB.tests.util.TestCase):
# verify the change worked as expected
self.assertEqual(cache.cache_size_bytes, 1)
# verify our entrance assumption is fullfilled
self.assert_(cache.total_estimated_size > 1)
self.assertTrue(cache.total_estimated_size > 1)
conn.cacheGC()
self.assert_(cache.total_estimated_size <= 1)
self.assertTrue(cache.total_estimated_size <= 1)
# sanity check
self.assert_(cache.total_estimated_size >= 0)
self.assertTrue(cache.total_estimated_size >= 0)
def test_cache_garbage_collection_shrinking_object(self):
db = self.db
......@@ -1138,18 +1139,18 @@ class EstimatedSizeTests(ZODB.tests.util.TestCase):
# verify the change worked as expected
self.assertEqual(cache.cache_size_bytes, 1000)
# verify our entrance assumption is fullfilled
self.assert_(cache.total_estimated_size > 1)
self.assertTrue(cache.total_estimated_size > 1)
# give the objects some size
obj.setValueWithSize(500)
transaction.savepoint()
self.assert_(cache.total_estimated_size > 500)
self.assertTrue(cache.total_estimated_size > 500)
# make the object smaller
obj.setValueWithSize(100)
transaction.savepoint()
# make sure there was no overflow
self.assert_(cache.total_estimated_size != 0)
self.assertTrue(cache.total_estimated_size != 0)
# the size is not larger than the allowed maximum
self.assert_(cache.total_estimated_size <= 1000)
self.assertTrue(cache.total_estimated_size <= 1000)
# ---- stubs
......
......@@ -73,7 +73,7 @@ class DBTests(ZODB.tests.util.TestCase):
# have tests of referencesf.
import ZODB.serialize
self.assert_(self.db.references is ZODB.serialize.referencesf)
self.assertTrue(self.db.references is ZODB.serialize.referencesf)
def test_invalidateCache():
......
......@@ -71,16 +71,16 @@ class DemoStorageTests(
def checkLengthAndBool(self):
self.assertEqual(len(self._storage), 0)
self.assert_(not self._storage)
self.assertTrue(not self._storage)
db = DB(self._storage) # creates object 0. :)
self.assertEqual(len(self._storage), 1)
self.assert_(self._storage)
self.assertTrue(self._storage)
conn = db.open()
for i in range(10):
conn.root()[i] = conn.root().__class__()
transaction.commit()
self.assertEqual(len(self._storage), 11)
self.assert_(self._storage)
self.assertTrue(self._storage)
def checkLoadBeforeUndo(self):
pass # we don't support undo yet
......
......@@ -113,13 +113,13 @@ class FileStorageTests(
# Convert it to a dict.
old_index = self.convert_index_to_dict()
self.assert_(isinstance(old_index, fsIndex))
self.assertTrue(isinstance(old_index, fsIndex))
new_index = self.convert_index_to_dict()
self.assert_(isinstance(new_index, dict))
self.assertTrue(isinstance(new_index, dict))
# Verify it's converted to fsIndex in memory upon open.
self.open(read_only=read_only)
self.assert_(isinstance(self._storage._index, fsIndex))
self.assertTrue(isinstance(self._storage._index, fsIndex))
# Verify it has the right content.
newindex_as_dict = dict(self._storage._index)
......@@ -129,9 +129,9 @@ class FileStorageTests(
self._storage.close()
current_index = self.convert_index_to_dict()
if read_only:
self.assert_(isinstance(current_index, dict))
self.assertTrue(isinstance(current_index, dict))
else:
self.assert_(isinstance(current_index, fsIndex))
self.assertTrue(isinstance(current_index, fsIndex))
def check_conversion_to_fsIndex_readonly(self):
# Same thing, but the disk .index should continue to hold a
......@@ -161,8 +161,8 @@ class FileStorageTests(
# Verify it's converted to fsIndex in memory upon open.
self.open()
self.assert_(isinstance(self._storage._index, fsIndex))
self.assert_(isinstance(self._storage._index._data, OOBTree))
self.assertTrue(isinstance(self._storage._index, fsIndex))
self.assertTrue(isinstance(self._storage._index._data, OOBTree))
# Verify it has the right content.
new_data_dict = dict(self._storage._index._data)
......@@ -259,7 +259,7 @@ class FileStorageTests(
try:
self._storage.pack(time.time(), referencesf)
except CorruptedError as detail:
self.assert_("redundant transaction length does not match "
self.assertTrue("redundant transaction length does not match "
"initial transaction length" in str(detail))
else:
self.fail("expected CorruptedError")
......
......@@ -44,14 +44,14 @@ class MVCCTests:
r1['myobj'] = 'yes'
c2 = db.open(transaction.TransactionManager())
r2 = c2.root()
self.assert_('myobj' not in r2)
self.assertTrue('myobj' not in r2)
c1.transaction_manager.commit()
self.assert_('myobj' not in r2)
self.assertTrue('myobj' not in r2)
c2.sync()
self.assert_('myobj' in r2)
self.assert_(r2['myobj'] == 'yes')
self.assertTrue('myobj' in r2)
self.assertTrue(r2['myobj'] == 'yes')
finally:
db.close()
......@@ -82,18 +82,18 @@ class MVCCTests:
# The second connection will now load root['alpha'], but due to
# MVCC, it should continue to see the old state.
self.assert_(r2['alpha']._p_changed is None) # A ghost
self.assert_(not r2['alpha'])
self.assert_(r2['alpha']._p_changed == 0)
self.assertTrue(r2['alpha']._p_changed is None) # A ghost
self.assertTrue(not r2['alpha'])
self.assertTrue(r2['alpha']._p_changed == 0)
# make root['alpha'] visible to the second connection
c2.sync()
# Now it should be in sync
self.assert_(r2['alpha']._p_changed is None) # A ghost
self.assert_(r2['alpha'])
self.assert_(r2['alpha']._p_changed == 0)
self.assert_(r2['alpha']['beta'] == 'yes')
self.assertTrue(r2['alpha']._p_changed is None) # A ghost
self.assertTrue(r2['alpha'])
self.assertTrue(r2['alpha']._p_changed == 0)
self.assertTrue(r2['alpha']['beta'] == 'yes')
# Repeat the test with root['gamma']
r1['gamma']['delta'] = 'yes'
......@@ -108,18 +108,18 @@ class MVCCTests:
# The second connection will now load root[3], but due to MVCC,
# it should continue to see the old state.
self.assert_(r2['gamma']._p_changed is None) # A ghost
self.assert_(not r2['gamma'])
self.assert_(r2['gamma']._p_changed == 0)
self.assertTrue(r2['gamma']._p_changed is None) # A ghost
self.assertTrue(not r2['gamma'])
self.assertTrue(r2['gamma']._p_changed == 0)
# make root[3] visible to the second connection
c2.sync()
# Now it should be in sync
self.assert_(r2['gamma']._p_changed is None) # A ghost
self.assert_(r2['gamma'])
self.assert_(r2['gamma']._p_changed == 0)
self.assert_(r2['gamma']['delta'] == 'yes')
self.assertTrue(r2['gamma']._p_changed is None) # A ghost
self.assertTrue(r2['gamma'])
self.assertTrue(r2['gamma']._p_changed == 0)
self.assertTrue(r2['gamma']['delta'] == 'yes')
finally:
db.close()
......
......@@ -216,7 +216,7 @@ class TestPList(unittest.TestCase):
def checkBackwardCompat(self):
# Verify that the sanest of the ZODB 3.2 dotted paths still works.
from ZODB.PersistentList import PersistentList as oldPath
self.assert_(oldPath is PersistentList)
self.assertTrue(oldPath is PersistentList)
def test_suite():
return unittest.makeSuite(TestPList, 'check')
......
......@@ -69,8 +69,8 @@ class PMTests(unittest.TestCase):
# If the root can be loaded successfully, we should be okay.
r = db.open().root()
# But make sure it looks like a new mapping
self.assert_(hasattr(r, 'data'))
self.assert_(not hasattr(r, '_container'))
self.assertTrue(hasattr(r, 'data'))
self.assertTrue(not hasattr(r, '_container'))
# TODO: This test fails in ZODB 3.3a1. It's making some assumption(s)
# about pickles that aren't true. Hard to say when it stopped working,
......@@ -97,15 +97,15 @@ class PMTests(unittest.TestCase):
state = u.load()
inst.__setstate__(state)
self.assert_(hasattr(inst, '_container'))
self.assert_(not hasattr(inst, 'data'))
self.assertTrue(hasattr(inst, '_container'))
self.assertTrue(not hasattr(inst, 'data'))
def checkBackwardCompat(self):
# Verify that the sanest of the ZODB 3.2 dotted paths still works.
from persistent.mapping import PersistentMapping as newPath
from ZODB.PersistentMapping import PersistentMapping as oldPath
self.assert_(oldPath is newPath)
self.assertTrue(oldPath is newPath)
def checkBasicOps(self):
from persistent.mapping import PersistentMapping
......@@ -113,7 +113,7 @@ class PMTests(unittest.TestCase):
m['name'] = 'bob'
self.assertEqual(m['name'], "bob")
self.assertEqual(m.get('name', 42), "bob")
self.assert_('name' in m)
self.assertTrue('name' in m)
try:
m['fred']
......@@ -121,7 +121,7 @@ class PMTests(unittest.TestCase):
pass
else:
self.fail("expected KeyError")
self.assert_('fred' not in m)
self.assertTrue('fred' not in m)
self.assertEqual(m.get('fred'), None)
self.assertEqual(m.get('fred', 42), 42)
......
......@@ -102,8 +102,8 @@ class RecoverTest(ZODB.tests.util.TestCase):
# fact not damaged.
def testNoDamage(self):
output = self.recover()
self.assert_('error' not in output, output)
self.assert_('\n0 bytes removed during recovery' in output, output)
self.assertTrue('error' not in output, output)
self.assertTrue('\n0 bytes removed during recovery' in output, output)
# Verify that the recovered database is identical to the original.
before = open(self.path, 'rb')
......@@ -121,7 +121,7 @@ class RecoverTest(ZODB.tests.util.TestCase):
for i in range(self.ITERATIONS):
self.damage(1, 1024)
output = self.recover()
self.assert_('error' in output, output)
self.assertTrue('error' in output, output)
self.recovered = FileStorage(self.dest)
self.recovered.close()
os.remove(self.path)
......@@ -131,7 +131,7 @@ class RecoverTest(ZODB.tests.util.TestCase):
for i in range(self.ITERATIONS):
self.damage(4, 512)
output = self.recover()
self.assert_('error' in output, output)
self.assertTrue('error' in output, output)
self.recovered = FileStorage(self.dest)
self.recovered.close()
os.remove(self.path)
......@@ -141,7 +141,7 @@ class RecoverTest(ZODB.tests.util.TestCase):
for i in range(self.ITERATIONS):
self.damage(1, 32 * 1024)
output = self.recover()
self.assert_('error' in output, output)
self.assertTrue('error' in output, output)
self.recovered = FileStorage(self.dest)
self.recovered.close()
os.remove(self.path)
......
......@@ -95,14 +95,14 @@ class SerializerTestCase(unittest.TestCase):
r = TestObjectReader(factory=test_factory)
g = r.getGhost(self.old_style_with_newargs)
self.assert_(isinstance(g, ClassWithNewargs))
self.assertTrue(isinstance(g, ClassWithNewargs))
self.assertEqual(g, 1)
g = r.getGhost(self.old_style_without_newargs)
self.assert_(isinstance(g, ClassWithoutNewargs))
self.assertTrue(isinstance(g, ClassWithoutNewargs))
g = r.getGhost(self.new_style_with_newargs)
self.assert_(isinstance(g, ClassWithNewargs))
self.assertTrue(isinstance(g, ClassWithNewargs))
g = r.getGhost(self.new_style_without_newargs)
self.assert_(isinstance(g, ClassWithoutNewargs))
self.assertTrue(isinstance(g, ClassWithoutNewargs))
def test_myhasattr(self):
......@@ -124,14 +124,15 @@ class SerializerTestCase(unittest.TestCase):
serialize.myhasattr, OldStyle(), "error")
self.assertRaises(ValueError,
serialize.myhasattr, NewStyle(), "error")
self.assert_(serialize.myhasattr(OldStyle(), "bar"))
self.assert_(serialize.myhasattr(NewStyle(), "bar"))
self.assert_(not serialize.myhasattr(OldStyle(), "rat"))
self.assert_(not serialize.myhasattr(NewStyle(), "rat"))
self.assertTrue(serialize.myhasattr(OldStyle(), "bar"))
self.assertTrue(serialize.myhasattr(NewStyle(), "bar"))
self.assertTrue(not serialize.myhasattr(OldStyle(), "rat"))
self.assertTrue(not serialize.myhasattr(NewStyle(), "rat"))
def test_suite():
suite = unittest.makeSuite(SerializerTestCase)
suite.addTest(
doctest.DocTestSuite("ZODB.serialize", checker=ZODB.tests.util.checker))
doctest.DocTestSuite("ZODB.serialize",
checker=ZODB.tests.util.checker))
return suite
......@@ -88,7 +88,7 @@ class TestUtils(unittest.TestCase):
# The pickle contains a GLOBAL ('c') opcode resolving to MinPO's
# module and class.
self.assert_(b'cZODB.tests.MinPO\nMinPO\n' in data)
self.assertTrue(b'cZODB.tests.MinPO\nMinPO\n' in data)
# Fiddle the pickle so it points to something "impossible" instead.
data = data.replace(b'cZODB.tests.MinPO\nMinPO\n',
......
......@@ -108,7 +108,7 @@ class ZODBTests(ZODB.tests.util.TestCase):
l1 = list(map(lambda k_v: (k_v[0], k_v[1][0]), l1))
l2 = list(map(lambda k_v1: (k_v1[0], k_v1[1][0]), l2))
self.assertEqual(l1, l2)
self.assert_(ob._p_oid != ob2._p_oid)
self.assertTrue(ob._p_oid != ob2._p_oid)
self.assertEqual(ob._p_jar, ob2._p_jar)
oids = {}
for v in ob.values():
......@@ -129,7 +129,7 @@ class ZODBTests(ZODB.tests.util.TestCase):
self.populate()
conn = self._db.open()
conn.root()
self.assert_(len(conn._cache) > 0) # Precondition
self.assertTrue(len(conn._cache) > 0) # Precondition
conn._resetCache()
self.assertEqual(len(conn._cache), 0)
......@@ -139,10 +139,10 @@ class ZODBTests(ZODB.tests.util.TestCase):
self.populate()
conn = self._db.open()
conn.root()
self.assert_(len(conn._cache) > 0) # Precondition
self.assertTrue(len(conn._cache) > 0) # Precondition
ZODB.Connection.resetCaches()
conn.close()
self.assert_(len(conn._cache) > 0) # Still not flushed
self.assertTrue(len(conn._cache) > 0) # Still not flushed
conn.open() # simulate the connection being reopened
self.assertEqual(len(conn._cache), 0)
......@@ -517,12 +517,12 @@ class ReadConflictTests(ZODB.tests.util.TestCase):
self.fail("No conflict occurred")
# real_data2 still ready to commit
self.assert_(real_data2._p_changed)
self.assertTrue(real_data2._p_changed)
# index2 values not ready to commit
self.assert_(not index2._p_changed)
self.assert_(not index2[0]._p_changed)
self.assert_(not index2[1]._p_changed)
self.assertTrue(not index2._p_changed)
self.assertTrue(not index2[0]._p_changed)
self.assertTrue(not index2[1]._p_changed)
self.assertRaises(ReadConflictError, tm.get().commit)
self.assertRaises(TransactionFailedError, tm.get().commit)
......
......@@ -280,7 +280,7 @@ class RecoveryBlobStorage(BlobTestBase,
# Requires a setUp() that creates a self._dst destination storage
def testSimpleBlobRecovery(self):
self.assert_(
self.assertTrue(
ZODB.interfaces.IBlobStorageRestoreable.providedBy(self._storage)
)
db = DB(self._storage)
......@@ -514,7 +514,7 @@ def loadblob_tmpstore():
We can access the blob correctly:
>>> tmpstore.loadBlob(blob_oid, tid) == blob_storage.loadBlob(blob_oid, tid)
>>> tmpstore.loadBlob(blob_oid,tid) == blob_storage.loadBlob(blob_oid,tid)
True
Clean up:
......
......@@ -36,21 +36,21 @@ class Test(unittest.TestCase):
def test__del__(self):
index = self.index
self.assert_(p64(1000) in index)
self.assert_(p64(100*1000) in index)
self.assertTrue(p64(1000) in index)
self.assertTrue(p64(100*1000) in index)
del self.index[p64(1000)]
del self.index[p64(100*1000)]
self.assert_(p64(1000) not in index)
self.assert_(p64(100*1000) not in index)
self.assertTrue(p64(1000) not in index)
self.assertTrue(p64(100*1000) not in index)
for key in list(self.index):
del index[key]
self.assert_(not index)
self.assertTrue(not index)
# Whitebox. Make sure empty buckets are removed
self.assert_(not index._data)
self.assertTrue(not index._data)
def testInserts(self):
index = self.index
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment