Commit 3fd87e79 authored by Jim Fulton's avatar Jim Fulton

Updated most tests to use a commob test framework to try to make sure

tests have a clean transaction state and to avoid leaving files behind.
parent 59188e56
......@@ -8,7 +8,6 @@ import ZODB
import os
import socket
import tempfile
import threading
import time
import unittest
......@@ -22,11 +21,12 @@ import unittest
class PackerTests(StorageTestBase):
def setUp(self):
StorageTestBase.setUp(self)
self.started = 0
def start(self):
self.started =1
self.path = tempfile.mktemp(suffix=".fs")
self.path = 'Data.fs'
self._storage = FileStorage(self.path)
self.db = ZODB.DB(self._storage)
self.do_updates()
......@@ -37,26 +37,20 @@ class PackerTests(StorageTestBase):
self._dostore()
def tearDown(self):
if not self.started:
return
self.db.close()
self._storage.close()
self.exit.close()
try:
os.kill(self.pid, 9)
except os.error:
pass
try:
os.waitpid(self.pid, 0)
except os.error, err:
##print "waitpid failed", err
pass
for ext in '', '.old', '.lock', '.index', '.tmp':
path = self.path + ext
if self.started:
self.db.close()
self.exit.close()
try:
os.remove(path)
os.kill(self.pid, 9)
except os.error:
pass
try:
os.waitpid(self.pid, 0)
except os.error, err:
##print "waitpid failed", err
pass
StorageTestBase.tearDown(self)
def set_inet_addr(self):
self.host = socket.gethostname()
......@@ -80,7 +74,7 @@ class PackerTests(StorageTestBase):
assert not os.path.exists(self.path + ".old")
def testAF_UNIXPack(self):
self.addr = tempfile.mktemp(suffix=".zeo-socket")
self.addr = "zeo-socket"
self.start()
status = os.system("zeopack.py -U %s" % self.addr)
assert status == 0
......
......@@ -17,7 +17,6 @@ import sys
import time
import random
import asyncore
import tempfile
import threading
import logging
......@@ -33,6 +32,7 @@ from ZODB.tests.StorageTestBase import StorageTestBase
from ZODB.tests.MinPO import MinPO
from ZODB.tests.StorageTestBase \
import zodb_pickle, zodb_unpickle, handle_all_serials, handle_serials
import ZODB.tests.util
import transaction
from transaction import Transaction
......@@ -93,7 +93,7 @@ class CommonSetupTearDown(StorageTestBase):
monitor = 0
db_class = DummyDB
def setUp(self):
def setUp(self, before=None):
"""Test setup for connection tests.
This starts only one server; a test may start more servers by
......@@ -102,8 +102,7 @@ class CommonSetupTearDown(StorageTestBase):
"""
self.__super_setUp()
logging.info("setUp() %s", self.id())
fd, self.file = tempfile.mkstemp()
os.close(fd)
self.file = 'storage_conf'
self.addr = []
self._pids = []
self._servers = []
......@@ -141,8 +140,7 @@ class CommonSetupTearDown(StorageTestBase):
for c in self.caches:
for i in 0, 1:
for ext in "", ".trace", ".lock":
base = "%s-%s.zec%s" % (c, "1", ext)
path = os.path.join(tempfile.tempdir, base)
path = "%s-%s.zec%s" % (c, "1", ext)
# On Windows before 2.3, we don't have a way to wait for
# the spawned server(s) to close, and they inherited
# file descriptors for our open files. So long as those
......@@ -183,7 +181,7 @@ class CommonSetupTearDown(StorageTestBase):
self.caches.append(cache)
storage = TestClientStorage(self.addr,
client=cache,
var=tempfile.tempdir,
var='.',
cache_size=cache_size,
wait=wait,
min_disconnect_poll=0.1,
......
......@@ -30,7 +30,9 @@ class AuthTest(CommonSetupTearDown):
realm = None
def setUp(self):
self.pwfile = tempfile.mktemp()
fd, self.pwfile = tempfile.mkstemp('pwfile')
os.close(fd)
if self.realm:
self.pwdb = self.dbclass(self.pwfile, self.realm)
else:
......@@ -38,6 +40,7 @@ class AuthTest(CommonSetupTearDown):
self.pwdb.add_user("foo", "bar")
self.pwdb.save()
self._checkZEOpasswd()
self.__super_setUp()
def _checkZEOpasswd(self):
......@@ -51,8 +54,8 @@ class AuthTest(CommonSetupTearDown):
zeopasswd.main(args + ["foo", "bar"])
def tearDown(self):
self.__super_tearDown()
os.remove(self.pwfile)
self.__super_tearDown()
def getConfig(self, path, create, read_only):
return "<mappingstorage 1/>"
......
......@@ -222,6 +222,7 @@ class GenericTests(
blob_cache_dir = None
def setUp(self):
StorageTestBase.StorageTestBase.setUp(self)
logger.info("setUp() %s", self.id())
port = get_port()
zconf = forker.ZEOConfig(('', port))
......@@ -232,7 +233,9 @@ class GenericTests(
self._conf_path = path
if not self.blob_cache_dir:
# This is the blob cache for ClientStorage
self.blob_cache_dir = tempfile.mkdtemp()
self.blob_cache_dir = tempfile.mkdtemp(
'blob_cache',
dir=os.path.abspath(os.getcwd()))
self._storage = ClientStorage(
zport, '1', cache_size=20000000,
min_disconnect_poll=0.5, wait=1,
......@@ -242,14 +245,13 @@ class GenericTests(
def tearDown(self):
self._storage.close()
os.remove(self._conf_path)
ZODB.blob.remove_committed_dir(self.blob_cache_dir)
for server in self._servers:
forker.shutdown_zeo_server(server)
if hasattr(os, 'waitpid'):
# Not in Windows Python until 2.3
for pid in self._pids:
os.waitpid(pid, 0)
StorageTestBase.StorageTestBase.tearDown(self)
def runTest(self):
try:
......@@ -300,29 +302,22 @@ class FileStorageRecoveryTests(StorageTestBase.StorageTestBase,
level = 2
def setUp(self):
self._storage = ZODB.FileStorage.FileStorage("Source.fs", create=True)
self._dst = ZODB.FileStorage.FileStorage("Dest.fs", create=True)
def getConfig(self):
filename = self.__fs_base = tempfile.mktemp()
return """\
<filestorage 1>
path %s
</filestorage>
""" % filename
""" % tempfile.mktemp(dir='.')
def _new_storage(self):
port = get_port()
zconf = forker.ZEOConfig(('', port))
zport, adminaddr, pid, path = forker.start_zeo_server(self.getConfig(),
zconf, port)
blob_cache_dir = tempfile.mkdtemp()
self._pids.append(pid)
self._servers.append(adminaddr)
self._conf_paths.append(path)
self.blob_cache_dirs.append(blob_cache_dir)
blob_cache_dir = tempfile.mkdtemp(dir='.')
storage = ClientStorage(
zport, '1', cache_size=20000000,
......@@ -332,10 +327,9 @@ class FileStorageRecoveryTests(StorageTestBase.StorageTestBase,
return storage
def setUp(self):
StorageTestBase.StorageTestBase.setUp(self)
self._pids = []
self._servers = []
self._conf_paths = []
self.blob_cache_dirs = []
self._storage = self._new_storage()
self._dst = self._new_storage()
......@@ -344,16 +338,13 @@ class FileStorageRecoveryTests(StorageTestBase.StorageTestBase,
self._storage.close()
self._dst.close()
for p in self._conf_paths:
os.remove(p)
for p in self.blob_cache_dirs:
ZODB.blob.remove_committed_dir(p)
for server in self._servers:
forker.shutdown_zeo_server(server)
if hasattr(os, 'waitpid'):
# Not in Windows Python until 2.3
for pid in self._pids:
os.waitpid(pid, 0)
StorageTestBase.StorageTestBase.tearDown(self)
def new_dest(self):
return self._new_storage()
......@@ -364,12 +355,11 @@ class FileStorageTests(FullGenericTests):
level = 2
def getConfig(self):
filename = self.__fs_base = tempfile.mktemp()
return """\
<filestorage 1>
path %s
path Data.fs
</filestorage>
""" % filename
"""
def checkInterfaceFromRemoteStorage(self):
# ClientStorage itself doesn't implement IStorageIteration, but the
......@@ -416,10 +406,10 @@ class DemoStorageTests(
return """
<demostorage 1>
<filestorage 1>
path %s
path Data.fs
</filestorage>
</demostorage>
""" % tempfile.mktemp()
"""
def checkUndoZombie(self):
# The test base class IteratorStorage assumes that we keep undo data
......@@ -615,22 +605,18 @@ test_classes = [OneTimeTests,
class CommonBlobTests:
def tearDown(self):
super(BlobAdaptedFileStorageTests, self).tearDown()
if os.path.exists(self.blobdir):
# Might be gone already if the super() method deleted
# the shared directory. Don't worry.
shutil.rmtree(self.blobdir)
def getConfig(self):
return """
<blobstorage 1>
blob-dir %s
blob-dir blobs
<filestorage 2>
path %s
path Data.fs
</filestorage>
</blobstorage>
""" % (self.blobdir, self.filestorage)
"""
blobdir = 'blobs'
blob_cache_dir = 'blob_cache'
def checkStoreBlob(self):
from ZODB.utils import oid_repr, tid_repr
......@@ -713,24 +699,17 @@ class CommonBlobTests:
def checkTransactionBufferCleanup(self):
oid = self._storage.new_oid()
handle, blob_file_name = tempfile.mkstemp()
os.close(handle)
open(blob_file_name, 'w').write('I am a happy blob.')
open('blob_file', 'w').write('I am a happy blob.')
t = transaction.Transaction()
self._storage.tpc_begin(t)
self._storage.storeBlob(
oid, ZODB.utils.z64, 'foo', blob_file_name, '', t)
oid, ZODB.utils.z64, 'foo', 'blob_file', '', t)
self._storage.close()
class BlobAdaptedFileStorageTests(FullGenericTests, CommonBlobTests):
"""ZEO backed by a BlobStorage-adapted FileStorage."""
def setUp(self):
self.blobdir = tempfile.mkdtemp() # blob directory on ZEO server
self.filestorage = tempfile.mktemp()
super(BlobAdaptedFileStorageTests, self).setUp()
def checkStoreAndLoadBlob(self):
from ZODB.utils import oid_repr, tid_repr
from ZODB.blob import Blob, BLOB_SUFFIX
......@@ -814,12 +793,8 @@ class BlobAdaptedFileStorageTests(FullGenericTests, CommonBlobTests):
class BlobWritableCacheTests(FullGenericTests, CommonBlobTests):
def setUp(self):
self.blobdir = self.blob_cache_dir = tempfile.mkdtemp()
self.filestorage = tempfile.mktemp()
self.shared_blob_dir = True
super(BlobWritableCacheTests, self).setUp()
blob_cache_dir = 'blobs'
shared_blob_dir = True
class StorageServerClientWrapper:
......
......@@ -43,7 +43,8 @@ def hexprint(file):
printable = ""
hex = ""
for character in line:
if character in string.printable and not ord(character) in [12,13,9]:
if (character in string.printable
and not ord(character) in [12,13,9]):
printable += character
else:
printable += '.'
......
......@@ -9,11 +9,12 @@ import re
import struct
import tempfile
import unittest
import ZODB.tests.util
import fstest
from fstest import FormatError, U64
class TestCorruptedFS(unittest.TestCase):
class TestCorruptedFS(ZODB.tests.util.TestCase):
f = open('test-checker.fs', 'rb')
datafs = f.read()
......@@ -21,17 +22,14 @@ class TestCorruptedFS(unittest.TestCase):
del f
def setUp(self):
self._temp = tempfile.mktemp()
ZODB.tests.util.TestCase.setUp(self)
self._temp = 'Data.fs'
self._file = open(self._temp, 'wb')
def tearDown(self):
if not self._file.closed:
self._file.close()
if os.path.exists(self._temp):
try:
os.remove(self._temp)
except os.error:
pass
ZODB.tests.util.TestCase.tearDown(self)
def noError(self):
if not self._file.closed:
......
......@@ -22,11 +22,12 @@ import unittest
class PackerTests(StorageTestBase):
def setUp(self):
StorageTestBase.setUp(self)
self.started = 0
def start(self):
self.started =1
self.path = tempfile.mktemp(suffix=".fs")
self.path = 'Data.fs'
self._storage = FileStorage(self.path)
self.db = ZODB.DB(self._storage)
self.do_updates()
......@@ -37,26 +38,19 @@ class PackerTests(StorageTestBase):
self._dostore()
def tearDown(self):
if not self.started:
return
self.db.close()
self._storage.close()
self.exit.close()
try:
os.kill(self.pid, 9)
except os.error:
pass
try:
os.waitpid(self.pid, 0)
except os.error, err:
##print "waitpid failed", err
pass
for ext in '', '.old', '.lock', '.index', '.tmp':
path = self.path + ext
if self.started:
self.db.close()
self.exit.close()
try:
os.remove(path)
os.kill(self.pid, 9)
except os.error:
pass
try:
os.waitpid(self.pid, 0)
except os.error, err:
##print "waitpid failed", err
pass
StorageTestBase.tearDown(self)
def set_inet_addr(self):
self.host = socket.gethostname()
......
......@@ -16,7 +16,6 @@
import os
import random
import stat
import tempfile
import ZODB, ZODB.FileStorage
from StorageTestBase import StorageTestBase
......@@ -24,12 +23,8 @@ from StorageTestBase import StorageTestBase
class FileStorageCorruptTests(StorageTestBase):
def setUp(self):
self.path = tempfile.mktemp()
self._storage = ZODB.FileStorage.FileStorage(self.path, create=1)
def tearDown(self):
self._storage.close()
self._storage.cleanup()
StorageTestBase.setUp(self)
self._storage = ZODB.FileStorage.FileStorage('Data.fs', create=1)
def _do_stores(self):
oids = []
......@@ -49,16 +44,15 @@ class FileStorageCorruptTests(StorageTestBase):
self._close()
# truncation the index file
path = self.path + '.index'
self.failUnless(os.path.exists(path))
f = open(path, 'r+')
self.failUnless(os.path.exists('Data.fs.index'))
f = open('Data.fs.index', 'r+')
f.seek(0, 2)
size = f.tell()
f.seek(size / 2)
f.truncate()
f.close()
self._storage = ZODB.FileStorage.FileStorage(self.path)
self._storage = ZODB.FileStorage.FileStorage('Data.fs')
self._check_stores(oids)
def checkCorruptedIndex(self):
......@@ -66,14 +60,13 @@ class FileStorageCorruptTests(StorageTestBase):
self._close()
# truncation the index file
path = self.path + '.index'
self.failUnless(os.path.exists(path))
size = os.stat(path)[stat.ST_SIZE]
f = open(path, 'r+')
self.failUnless(os.path.exists('Data.fs.index'))
size = os.stat('Data.fs.index')[stat.ST_SIZE]
f = open('Data.fs.index', 'r+')
while f.tell() < size:
f.seek(random.randrange(1, size / 10), 1)
f.write('\000')
f.close()
self._storage = ZODB.FileStorage.FileStorage(self.path)
self._storage = ZODB.FileStorage.FileStorage('Data.fs')
self._check_stores(oids)
......@@ -24,6 +24,7 @@ import time
class RecoveryStorage(IteratorDeepCompare):
# Requires a setUp() that creates a self._dst destination storage
def checkSimpleRecovery(self):
oid = self._storage.new_oid()
......
......@@ -21,8 +21,6 @@ single object revision.
import sys
import time
import types
import unittest
from cPickle import Pickler, Unpickler
from cStringIO import StringIO
......@@ -30,6 +28,7 @@ import transaction
from ZODB.utils import u64
from ZODB.tests.MinPO import MinPO
import ZODB.tests.util
ZERO = '\0'*8
......@@ -78,7 +77,7 @@ def zodb_unpickle(data):
u = Unpickler(f)
u.persistent_load = persistent_load
klass_info = u.load()
if isinstance(klass_info, types.TupleType):
if isinstance(klass_info, tuple):
if isinstance(klass_info[0], type):
# Unclear: what is the second part of klass_info?
klass, xxx = klass_info
......@@ -119,13 +118,13 @@ def handle_all_serials(oid, *args):
"""
d = {}
for arg in args:
if isinstance(arg, types.StringType):
if isinstance(arg, str):
d[oid] = arg
elif arg is None:
pass
else:
for oid, serial in arg:
if not isinstance(serial, types.StringType):
if not isinstance(serial, str):
raise serial # error from ZEO server
d[oid] = serial
return d
......@@ -142,14 +141,12 @@ def import_helper(name):
return sys.modules[name]
class StorageTestBase(unittest.TestCase):
class StorageTestBase(ZODB.tests.util.TestCase):
# It would be simpler if concrete tests didn't need to extend
# setUp() and tearDown().
def setUp(self):
# You need to override this with a setUp that creates self._storage
self._storage = None
_storage = None
def _close(self):
# You should override this if closing your storage requires additional
......@@ -159,6 +156,7 @@ class StorageTestBase(unittest.TestCase):
def tearDown(self):
self._close()
ZODB.tests.util.TestCase.tearDown(self)
def _dostore(self, oid=None, revid=None, data=None,
already_pickled=0, user=None, description=None):
......@@ -176,7 +174,7 @@ class StorageTestBase(unittest.TestCase):
revid = ZERO
if data is None:
data = MinPO(7)
if type(data) == types.IntType:
if type(data) == int:
data = MinPO(data)
if not already_pickled:
data = zodb_pickle(data)
......
......@@ -33,10 +33,9 @@ for one of our examples.)
>>> import ZODB.FileStorage
>>> from ZODB.blob import BlobStorage
>>> from ZODB.DB import DB
>>> from tempfile import mkdtemp
>>> base_storage = ZODB.FileStorage.FileStorage(
... 'BlobTests.fs', create=True)
>>> blob_dir = mkdtemp()
>>> blob_dir = 'blobs'
>>> blob_storage = BlobStorage(blob_dir, base_storage)
>>> database = DB(blob_storage)
......@@ -106,4 +105,3 @@ After testing this, we don't need the storage directory and databases anymore:
>>> database.close()
>>> database2.close()
>>> blob_storage.close()
>>> base_storage.cleanup()
......@@ -23,11 +23,10 @@ Set up:
>>> from persistent.mapping import PersistentMapping
>>> import shutil
>>> import transaction
>>> from tempfile import mkdtemp, mktemp
>>> storagefile1 = mktemp()
>>> blob_dir1 = mkdtemp()
>>> storagefile2 = mktemp()
>>> blob_dir2 = mkdtemp()
>>> storagefile1 = 'Data.fs.1'
>>> blob_dir1 = 'blobs1'
>>> storagefile2 = 'Data.fs.2'
>>> blob_dir2 = 'blobs2'
We need an database with an undoing blob supporting storage:
......@@ -62,7 +61,7 @@ Export our blobs from a database1 connection:
>>> conn = root1['blobdata']._p_jar
>>> oid = root1['blobdata']._p_oid
>>> exportfile = mktemp()
>>> exportfile = 'export'
>>> nothing = connection1.exportFile(oid, exportfile)
Import our exported data into database2:
......
......@@ -84,40 +84,37 @@ already been used to create a lawn structure.
1. Non-existing directories will trigger a bushy layout:
>>> import tempfile
>>> import shutil
>>> d = tempfile.mkdtemp()
>>> shutil.rmtree(d)
>>> auto_layout_select(d)
>>> import os, shutil
>>> auto_layout_select('blobs')
'bushy'
2. Empty directories will trigger a bushy layout too:
>>> d = tempfile.mkdtemp()
>>> auto_layout_select(d)
>>> os.mkdir('blobs')
>>> auto_layout_select('blobs')
'bushy'
3. If the directory contains a marker for the strategy it will be used:
>>> from ZODB.blob import LAYOUT_MARKER
>>> import os.path
>>> open(os.path.join(d, LAYOUT_MARKER), 'wb').write('bushy')
>>> auto_layout_select(d)
>>> open(os.path.join('blobs', LAYOUT_MARKER), 'wb').write('bushy')
>>> auto_layout_select('blobs')
'bushy'
>>> open(os.path.join(d, LAYOUT_MARKER), 'wb').write('lawn')
>>> auto_layout_select(d)
>>> open(os.path.join('blobs', LAYOUT_MARKER), 'wb').write('lawn')
>>> auto_layout_select('blobs')
'lawn'
>>> shutil.rmtree(d)
>>> shutil.rmtree('blobs')
4. If the directory does not contain a marker but other files, we assume that
it was created with an earlier version of the blob implementation and uses our
`lawn` layout:
>>> d = tempfile.mkdtemp()
>>> open(os.path.join(d, '0x0101'), 'wb').write('foo')
>>> auto_layout_select(d)
>>> os.mkdir('blobs')
>>> open(os.path.join('blobs', '0x0101'), 'wb').write('foo')
>>> auto_layout_select('blobs')
'lawn'
>>> shutil.rmtree(d)
>>> shutil.rmtree('blobs')
Directory layout markers
......@@ -127,8 +124,7 @@ When the file system helper (FSH) is asked to create the directory structure,
it will leave a marker with the choosen layout if no marker exists yet:
>>> from ZODB.blob import FilesystemHelper
>>> d = tempfile.mkdtemp()
>>> blobs = os.path.join(d, 'blobs')
>>> blobs = 'blobs'
>>> fsh = FilesystemHelper(blobs)
>>> fsh.layout_name
'bushy'
......@@ -154,7 +150,7 @@ the marker will be used in the future:
>>> import ZODB.FileStorage
>>> from ZODB.blob import BlobStorage
>>> datafs = os.path.join(d, 'data.fs')
>>> datafs = 'data.fs'
>>> base_storage = ZODB.FileStorage.FileStorage(datafs)
>>> os.mkdir(blobs)
......@@ -164,13 +160,14 @@ the marker will be used in the future:
'lawn'
>>> open(os.path.join(blobs, LAYOUT_MARKER), 'rb').read()
'lawn'
>>> blob_storage = BlobStorage(blobs, base_storage, layout='bushy') # doctest: +ELLIPSIS
>>> blob_storage = BlobStorage('blobs', base_storage, layout='bushy')
... # doctest: +ELLIPSIS
Traceback (most recent call last):
ValueError: Directory layout `bushy` selected for blob directory .../blobs/, but marker found for layout `lawn`
>>> base_storage.close()
>>> rmtree(d)
>>> rmtree('blobs')
Migrating between directory layouts
......@@ -189,7 +186,8 @@ The migration is accessible as a library function:
Create a `lawn` directory structure and migrate it to the new `bushy` one:
>>> from ZODB.blob import FilesystemHelper
>>> d = tempfile.mkdtemp()
>>> d = 'd'
>>> os.mkdir(d)
>>> old = os.path.join(d, 'old')
>>> old_fsh = FilesystemHelper(old, 'lawn')
>>> old_fsh.create()
......
......@@ -23,11 +23,9 @@ Set up:
>>> from ZODB.blob import Blob, BlobStorage
>>> from ZODB import utils
>>> from ZODB.DB import DB
>>> import shutil
>>> import transaction
>>> from tempfile import mkdtemp, mktemp
>>> storagefile = mktemp()
>>> blob_dir = mkdtemp()
>>> storagefile = 'Data.fs'
>>> blob_dir = 'blobs'
A helper method to assure a unique timestamp across multiple platforms:
......@@ -261,7 +259,8 @@ We can also see, that the flag is set during the pack, by leveraging the
knowledge that the underlying storage's pack method is also called:
>>> def dummy_pack(time, ref):
... print "_blobs_pack_is_in_progress =", blob_storage._blobs_pack_is_in_progress
... print "_blobs_pack_is_in_progress =",
... print blob_storage._blobs_pack_is_in_progress
... return base_pack(time, ref)
>>> base_pack = base_storage.pack
>>> base_storage.pack = dummy_pack
......@@ -270,7 +269,3 @@ knowledge that the underlying storage's pack method is also called:
>>> blob_storage._blobs_pack_is_in_progress
False
>>> base_storage.pack = base_pack
Clean up our blob directory:
>>> rmtree(blob_dir)
......@@ -30,10 +30,9 @@ First, we need a datatabase with blob support::
>>> from ZODB.MappingStorage import MappingStorage
>>> from ZODB.blob import BlobStorage
>>> from ZODB.DB import DB
>>> from tempfile import mkdtemp
>>> import os.path
>>> base_storage = MappingStorage('test')
>>> blob_dir = mkdtemp()
>>> blob_dir = os.path.abspath('blobs')
>>> blob_storage = BlobStorage(blob_dir, base_storage)
>>> database = DB(blob_storage)
......
......@@ -21,10 +21,8 @@ We need a database with a blob supporting storage::
>>> from ZODB.blob import Blob, BlobStorage
>>> from ZODB.DB import DB
>>> import transaction
>>> import tempfile
>>> from tempfile import mkdtemp
>>> base_storage = MappingStorage("test")
>>> blob_dir = mkdtemp()
>>> blob_dir = 'blobs'
>>> blob_storage = BlobStorage(blob_dir, base_storage)
>>> database = DB(blob_storage)
>>> connection1 = database.open()
......@@ -351,7 +349,7 @@ clean up dirty files:
... def tpc_abort(self):
... pass
>>> base_storage = DummyBaseStorage()
>>> blob_dir2 = mkdtemp()
>>> blob_dir2 = 'blobs2'
>>> blob_storage2 = BlobStorage(blob_dir2, base_storage)
>>> committed_blob_dir = blob_storage2.fshelper.getPathForOID(0)
>>> os.makedirs(committed_blob_dir)
......
......@@ -29,22 +29,23 @@ import transaction
import ZODB
import ZODB.MappingStorage
from ZODB.tests.MinPO import MinPO
import ZODB.tests.util
from ZODB.utils import p64
from persistent import Persistent
class CacheTestBase(unittest.TestCase):
class CacheTestBase(ZODB.tests.util.TestCase):
def setUp(self):
ZODB.tests.util.TestCase.setUp(self)
store = ZODB.MappingStorage.MappingStorage()
self.db = ZODB.DB(store,
cache_size = self.CACHE_SIZE)
self.conns = []
def tearDown(self):
for conn in self.conns:
conn.close()
self.db.close()
ZODB.tests.util.TestCase.tearDown(self)
CACHE_SIZE = 20
......@@ -89,10 +90,8 @@ class CantGetRidOfMe(MinPO):
class DBMethods(CacheTestBase):
__super_setUp = CacheTestBase.setUp
def setUp(self):
self.__super_setUp()
CacheTestBase.setUp(self)
for i in range(4):
self.noodle_new_connection()
......
......@@ -12,22 +12,18 @@
#
##############################################################################
import tempfile
import unittest
import transaction
import unittest
import ZODB.config
from ZODB.POSException import ReadOnlyError
import ZODB.POSException
import ZODB.tests.util
class ConfigTestBase(unittest.TestCase):
class ConfigTestBase(ZODB.tests.util.TestCase):
def _opendb(self, s):
return ZODB.config.databaseFromString(s)
def tearDown(self):
if getattr(self, "storage", None) is not None:
self.storage.cleanup()
def _test(self, s):
db = self._opendb(s)
self.storage = db._storage
......@@ -58,28 +54,26 @@ class ZODBConfigTest(ConfigTestBase):
""")
def test_file_config1(self):
path = tempfile.mktemp()
self._test(
"""
<zodb>
<filestorage>
path %s
path Data.fs
</filestorage>
</zodb>
""" % path)
""")
def test_file_config2(self):
path = tempfile.mktemp()
cfg = """
<zodb>
<filestorage>
path %s
path Data.fs
create false
read-only true
</filestorage>
</zodb>
""" % path
self.assertRaises(ReadOnlyError, self._test, cfg)
"""
self.assertRaises(ZODB.POSException.ReadOnlyError, self._test, cfg)
def test_demo_config(self):
cfg = """
......
......@@ -23,19 +23,18 @@ from ZODB.config import databaseFromString
from ZODB.utils import p64, u64
from ZODB.tests.warnhook import WarningsHook
from zope.interface.verify import verifyObject
import ZODB.tests.util
class ConnectionDotAdd(unittest.TestCase):
class ConnectionDotAdd(ZODB.tests.util.TestCase):
def setUp(self):
ZODB.tests.util.TestCase.setUp(self)
from ZODB.Connection import Connection
self.db = StubDatabase()
self.datamgr = Connection(self.db)
self.datamgr.open()
self.transaction = StubTransaction()
def tearDown(self):
transaction.abort()
def check_add(self):
from ZODB.POSException import InvalidObjectReference
obj = StubObject()
......@@ -524,10 +523,11 @@ class _PlayPersistent(Persistent):
def setValueWithSize(self, size=0): self.value = size*' '
__init__ = setValueWithSize
class EstimatedSizeTests(unittest.TestCase):
class EstimatedSizeTests(ZODB.tests.util.TestCase):
"""check that size estimations are handled correctly."""
def setUp(self):
ZODB.tests.util.TestCase.setUp(self)
self.db = db = databaseFromString("<zodb>\n<mappingstorage/>\n</zodb>")
self.conn = c = db.open()
self.obj = obj = _PlayPersistent()
......@@ -545,7 +545,8 @@ class EstimatedSizeTests(unittest.TestCase):
transaction.commit()
new_size = obj._p_estimated_size
self.assert_(new_size > size)
self.assertEqual(cache.total_estimated_size, cache_size + new_size - size)
self.assertEqual(cache.total_estimated_size,
cache_size + new_size - size)
def test_size_set_on_write_savepoint(self):
obj, cache = self.obj, self.conn._cache
......
......@@ -11,36 +11,30 @@
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
from ZODB.tests.MinPO import MinPO
from zope.testing import doctest
import datetime
import os
import time
import unittest
import datetime
import transaction
from zope.testing import doctest
import unittest
import ZODB
import ZODB.FileStorage
from ZODB.tests.MinPO import MinPO
import ZODB.tests.util
# Return total number of connections across all pools in a db._pools.
def nconn(pools):
return sum([len(pool.all) for pool in pools.values()])
class DBTests(unittest.TestCase):
class DBTests(ZODB.tests.util.TestCase):
def setUp(self):
self.__path = os.path.abspath('test.fs')
store = ZODB.FileStorage.FileStorage(self.__path)
self.db = ZODB.DB(store)
ZODB.tests.util.TestCase.setUp(self)
self.db = ZODB.DB('test.fs')
def tearDown(self):
self.db.close()
for s in ('', '.index', '.lock', '.tmp'):
if os.path.exists(self.__path+s):
os.remove(self.__path+s)
ZODB.tests.util.TestCase.tearDown(self)
def dowork(self):
c = self.db.open()
......
......@@ -16,7 +16,7 @@ import random
import transaction
from ZODB.DB import DB
from zope.testing import doctest
import zope.testing.setupstack
import ZODB.tests.util
import ZODB.utils
import ZODB.DemoStorage
from ZODB.tests import (
......@@ -44,11 +44,9 @@ class DemoStorageTests(
):
def setUp(self):
StorageTestBase.StorageTestBase.setUp(self)
self._storage = ZODB.DemoStorage.DemoStorage()
def tearDown(self):
self._storage.close()
def checkOversizeNote(self):
# This base class test checks for the common case where a storage
# doesnt support huge transaction metadata. This storage doesnt
......@@ -85,13 +83,13 @@ class DemoStorageTests(
class DemoStorageWrappedBase(DemoStorageTests):
def setUp(self):
import ZODB.DemoStorage
StorageTestBase.StorageTestBase.setUp(self)
self._base = self._makeBaseStorage()
self._storage = ZODB.DemoStorage.DemoStorage(base=self._base)
def tearDown(self):
self._storage.close()
self._base.close()
StorageTestBase.StorageTestBase.tearDown(self)
def _makeBaseStorage(self):
raise NotImplementedError
......@@ -112,8 +110,7 @@ class DemoStorageWrappedAroundFileStorage(DemoStorageWrappedBase):
def setUp(test):
random.seed(0)
zope.testing.setupstack.setUpDirectory(test)
zope.testing.setupstack.register(test, transaction.abort)
ZODB.tests.util.setUp(test)
def testSomeDelegation():
r"""
......@@ -158,11 +155,11 @@ def test_suite():
return unittest.TestSuite((
doctest.DocFileSuite('synchronized.txt'),
doctest.DocTestSuite(
setUp=setUp, tearDown=zope.testing.setupstack.tearDown,
setUp=setUp, tearDown=ZODB.tests.util.tearDown,
),
doctest.DocFileSuite(
'README.txt',
setUp=setUp, tearDown=zope.testing.setupstack.tearDown,
setUp=setUp, tearDown=ZODB.tests.util.tearDown,
),
))
......@@ -172,11 +169,11 @@ def test_suite():
def test_suite():
suite = unittest.TestSuite((
doctest.DocTestSuite(
setUp=setUp, tearDown=zope.testing.setupstack.tearDown,
setUp=setUp, tearDown=ZODB.tests.util.tearDown,
),
doctest.DocFileSuite(
'../DemoStorage.test',
setUp=setUp, tearDown=zope.testing.setupstack.tearDown,
setUp=setUp, tearDown=ZODB.tests.util.tearDown,
),
))
suite.addTest(unittest.makeSuite(DemoStorageTests, 'check'))
......
......@@ -33,12 +33,9 @@ class BaseFileStorageTests(StorageTestBase.StorageTestBase):
**kwargs)
def setUp(self):
StorageTestBase.StorageTestBase.setUp(self)
self.open(create=1)
def tearDown(self):
self._storage.close()
self._storage.cleanup()
class FileStorageTests(
BaseFileStorageTests,
BasicStorage.BasicStorage,
......@@ -296,14 +293,14 @@ class FileStorageRecoveryTest(
):
def setUp(self):
StorageTestBase.StorageTestBase.setUp(self)
self._storage = ZODB.FileStorage.FileStorage("Source.fs", create=True)
self._dst = ZODB.FileStorage.FileStorage("Dest.fs", create=True)
def tearDown(self):
self._storage.close()
self._dst.close()
self._storage.cleanup()
self._dst.cleanup()
StorageTestBase.StorageTestBase.tearDown(self)
def new_dest(self):
return ZODB.FileStorage.FileStorage('Dest.fs')
......@@ -316,24 +313,16 @@ class FileStorageNoRestore(ZODB.FileStorage.FileStorage):
raise Exception
class FileStorageNoRestoreRecoveryTest(
StorageTestBase.StorageTestBase,
RecoveryStorage.RecoveryStorage,
):
class FileStorageNoRestoreRecoveryTest(FileStorageRecoveryTest):
# This test actually verifies a code path of
# BaseStorage.copyTransactionsFrom. For simplicity of implementation, we
# use a FileStorage deprived of its restore method.
def setUp(self):
StorageTestBase.StorageTestBase.setUp(self)
self._storage = FileStorageNoRestore("Source.fs", create=True)
self._dst = FileStorageNoRestore("Dest.fs", create=True)
def tearDown(self):
self._storage.close()
self._dst.close()
self._storage.cleanup()
self._dst.cleanup()
def new_dest(self):
return FileStorageNoRestore('Dest.fs')
......
......@@ -40,11 +40,9 @@ class MappingStorageTests(
):
def setUp(self):
StorageTestBase.StorageTestBase.setUp(self, )
self._storage = ZODB.MappingStorage.MappingStorage()
def tearDown(self):
self._storage.close()
def checkOversizeNote(self):
# This base class test checks for the common case where a storage
# doesnt support huge transaction metadata. This storage doesnt
......
......@@ -17,38 +17,38 @@ import base64
import os
import random
import sys
import tempfile
import unittest
import StringIO
import ZODB
import ZODB.tests.util
from ZODB.FileStorage import FileStorage
import ZODB.fsrecover
from persistent.mapping import PersistentMapping
import transaction
class RecoverTest(unittest.TestCase):
class RecoverTest(ZODB.tests.util.TestCase):
level = 2
path = None
def setUp(self):
self.path = tempfile.mktemp(suffix=".fs")
ZODB.tests.util.TestCase.setUp(self)
self.path = 'source.fs'
self.storage = FileStorage(self.path)
self.populate()
self.dest = tempfile.mktemp(suffix=".fs")
self.dest = 'dest.fs'
self.recovered = None
def tearDown(self):
self.storage.close()
if self.recovered is not None:
self.recovered.close()
self.storage.cleanup()
temp = FileStorage(self.dest)
temp.close()
temp.cleanup()
ZODB.tests.util.TestCase.tearDown(self)
def populate(self):
db = ZODB.DB(self.storage)
......
......@@ -20,6 +20,7 @@ import ZODB.MappingStorage
from ZODB.POSException import ReadConflictError, ConflictError
from ZODB.POSException import TransactionFailedError
from ZODB.tests.warnhook import WarningsHook
import ZODB.tests.util
from persistent import Persistent
from persistent.mapping import PersistentMapping
......@@ -38,13 +39,18 @@ class DecoyIndependent(Persistent):
def _p_independent(self):
return 0
class ZODBTests(unittest.TestCase):
class ZODBTests(ZODB.tests.util.TestCase):
def setUp(self):
ZODB.tests.util.TestCase.setUp(self)
self._storage = ZODB.FileStorage.FileStorage(
'ZODBTests.fs', create=1)
self._db = ZODB.DB(self._storage)
def tearDown(self):
self._db.close()
ZODB.tests.util.TestCase.tearDown(self)
def populate(self):
transaction.begin()
conn = self._db.open()
......@@ -56,10 +62,6 @@ class ZODBTests(unittest.TestCase):
transaction.commit()
conn.close()
def tearDown(self):
self._db.close()
self._storage.cleanup()
def checkExportImport(self, abort_it=False):
self.populate()
conn = self._db.open()
......@@ -431,9 +433,10 @@ class ZODBTests(unittest.TestCase):
transaction.abort()
conn.close()
class ReadConflictTests(unittest.TestCase):
class ReadConflictTests(ZODB.tests.util.TestCase):
def setUp(self):
ZODB.tests.utils.TestCase.setUp(self)
self._storage = ZODB.MappingStorage.MappingStorage()
def readConflict(self, shouldFail=True):
......
......@@ -11,17 +11,13 @@
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
r"""
fsdump test
===========
Let's get a temp file path to work with first.
Let's get a path to work with first.
>>> import tempfile
>>> path = tempfile.mktemp('.fs', 'Data')
>>> print 'path:', path #doctest: +ELLIPSIS
path: ...Data...fs
>>> path = 'Data.fs'
More imports.
......@@ -69,10 +65,12 @@ Now we see two transactions and two changed objects.
Clean up.
>>> st.close()
>>> st.cleanup() # remove .fs, .index, etc
"""
from zope.testing import doctest
import zope.testing.setupstack
def test_suite():
return doctest.DocTestSuite()
return doctest.DocTestSuite(
setUp=zope.testing.setupstack.setUpDirectory,
tearDown=zope.testing.setupstack.tearDown)
......@@ -154,6 +154,7 @@ class MinimalTestSuite(StorageTestBase.StorageTestBase,
):
def setUp(self):
StorageTestBase.StorageTestBase.setUp(self)
self._storage = MinimalMemoryStorage()
# we don't implement undo
......
This diff is collapsed.
......@@ -15,18 +15,19 @@
$Id$
"""
import unittest
from zope.testing import doctest, module, setupstack
from zope.testing import doctest, module
import ZODB.tests.util
def setUp(test):
ZODB.tests.util.setUp(test)
module.setUp(test, 'ConflictResolution_txt')
setupstack.setUpDirectory(test)
def tearDown(test):
test.globs['db'].close()
test.globs['db1'].close()
test.globs['db2'].close()
module.tearDown(test)
setupstack.tearDown(test)
ZODB.tests.util.tearDown(test)
def test_suite():
return unittest.TestSuite((
......
......@@ -16,12 +16,9 @@ r"""
fsoids test, of the workhorse fsoids.Trace class
================================================
Let's get a temp file path to work with first.
Let's get a path to work with first.
>>> import tempfile
>>> path = tempfile.mktemp('.fs', 'Data')
>>> print 'path:', path #doctest: +ELLIPSIS
path: ...Data...fs
>>> path = 'Data.fs'
More imports.
......
......@@ -16,19 +16,21 @@ $Id$
"""
import unittest
from zope.testing import doctest, module
import ZODB.tests.util
def setUp(test):
ZODB.tests.util.setUp(test)
module.setUp(test, 'historical_connections_txt')
def tearDown(test):
test.globs['db'].close()
test.globs['db2'].close()
test.globs['storage'].close()
test.globs['storage'].cleanup()
# the DB class masks the module because of __init__ shenanigans
DB_module = __import__('ZODB.DB', globals(), locals(), ['chicken'])
DB_module.time = test.globs['original_time']
module.tearDown(test)
ZODB.tests.util.tearDown(test)
def test_suite():
return unittest.TestSuite((
......
......@@ -52,14 +52,15 @@ class FakeModule:
def setUp(test):
ZODB.tests.util.setUp(test)
test.globs['some_database'] = ZODB.tests.util.DB()
module = FakeModule('ZODB.persistentclass_txt', test.globs)
sys.modules[module.__name__] = module
def tearDown(test):
transaction.abort()
test.globs['some_database'].close()
del sys.modules['ZODB.persistentclass_txt']
ZODB.tests.util.tearDown(test)
def test_suite():
return unittest.TestSuite((
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment