Commit ff27e154 authored by Jérome Perrin's avatar Jérome Perrin

tests: drop most of the custom test framework

This was probably for python2.6 compatibility, nowadays we can use
standard library's unittest
parent 72e2b857
......@@ -30,7 +30,7 @@
from os.path import join
from cloudooo.tests.cloudoooTestCase import TestCase
from cloudooo.tests.backportUnittest import skip
from unittest import skip
class TestAllSupportedFormat(TestCase):
......
......@@ -46,7 +46,7 @@ from cloudooo.interfaces.monitor import IMonitor
from cloudooo.interfaces.granulate import ITableGranulator, \
IImageGranulator, \
ITextGranulator
from cloudooo.tests.backportUnittest import TestCase, expectedFailure
from unittest import TestCase, expectedFailure
import zope.interface.verify
......
......@@ -36,7 +36,7 @@ from StringIO import StringIO
from lxml import etree
from zipfile import ZipFile, is_zipfile
from cloudooo.tests.cloudoooTestCase import TestCase
from cloudooo.tests.backportUnittest import expectedFailure
from unittest import expectedFailure
import magic
from cloudooo.handler.ooo.tests.testOooMimemapper import text_expected_tuple, presentation_expected_tuple
......
# Backport of Python 2.7 unittest chosen parts to be able to use the
# "skip" decorators, and the associated ExpectedFailure and
# UnexpectedSuccess.
#
# Implementation is mostly a direct translation from Python r75708
# grep for "BACK" comments for backport-specific remarks.
import unittest
import sys
import time
class SkipTest(Exception):
"""
Raise this exception in a test to skip it.
Usually you can use TestResult.skip() or one of the skipping decorators
instead of raising this directly.
"""
pass
class _ExpectedFailure(Exception):
"""
Raise this when a test is expected to fail.
This is an implementation detail.
"""
def __init__(self, exc_info):
Exception.__init__(self)
self.exc_info = exc_info
class _UnexpectedSuccess(Exception):
"""
The test was supposed to fail, but it didn't!
"""
pass
class SetupSiteError(Exception):
"""
The ERP5 Site could not have been setup.
This is raised when the site could not have been created in a previous
test. We want this to count as an error, but we do not want this to happear
in traceback for readability.
"""
pass
def _id(obj):
return obj
def skip(reason):
"""
Unconditionally skip a test.
"""
def decorator(test_item):
if isinstance(test_item, type) and issubclass(test_item, TestCase):
test_item.__unittest_skip__ = True
test_item.__unittest_skip_why__ = reason
return test_item
def skip_wrapper(*args, **kwargs):
raise SkipTest(reason)
skip_wrapper.__name__ = test_item.__name__
skip_wrapper.__doc__ = test_item.__doc__
return skip_wrapper
return decorator
def skipIf(condition, reason):
"""
Skip a test if the condition is true.
"""
if condition:
return skip(reason)
return _id
def skipUnless(condition, reason):
"""
Skip a test unless the condition is true.
"""
if not condition:
return skip(reason)
return _id
def expectedFailure(func):
def wrapper(*args, **kwargs):
try:
func(*args, **kwargs)
except Exception:
raise _ExpectedFailure(sys.exc_info())
raise _UnexpectedSuccess
wrapper.__name__ = func.__name__
wrapper.__doc__ = func.__doc__
return wrapper
class TestCase(unittest.TestCase):
"""We redefine here the run() method, and add a skipTest() method.
"""
failureException = AssertionError
def run(self, result=None):
orig_result = result
if result is None:
result = self.defaultTestResult()
# BACK: Not necessary for Python < 2.7:
# TestResult.startTestRun does not exist yet
# startTestRun = getattr(result, 'startTestRun', None)
# if startTestRun is not None:
# startTestRun()
# BACK: Not needed for Python < 2.7
# unittest.addCleanup does not exist yet
# self._resultForDoCleanups = result
result.startTest(self)
if getattr(self.__class__, "__unittest_skip__", False):
# If the whole class was skipped.
try:
result.addSkip(self, self.__class__.__unittest_skip_why__)
finally:
result.stopTest(self)
return
testMethod = getattr(self, self._testMethodName)
try:
success = False
try:
self.setUp()
except SkipTest as e:
result.addSkip(self, str(e))
except SetupSiteError as e:
result.errors.append(None)
except BaseException as e:
result.addError(self, sys.exc_info())
if isinstance(e, (KeyboardInterrupt, SystemExit)):
raise
else:
try:
testMethod()
except self.failureException:
result.addFailure(self, sys.exc_info())
except _ExpectedFailure as e:
result.addExpectedFailure(self, e.exc_info)
except _UnexpectedSuccess:
result.addUnexpectedSuccess(self)
except SkipTest as e:
result.addSkip(self, str(e))
except BaseException as e:
result.addError(self, sys.exc_info())
if isinstance(e, (KeyboardInterrupt, SystemExit)):
raise
else:
success = True
try:
self.tearDown()
except BaseException as e:
result.addError(self, sys.exc_info())
if isinstance(e, (KeyboardInterrupt, SystemExit)):
raise
success = False
# BACK: Not needed for Python < 2.7
# unittest.addCleanup does not exist yet
# cleanUpSuccess = self.doCleanups()
# success = success and cleanUpSuccess
if success:
result.addSuccess(self)
finally:
result.stopTest(self)
# BACK: Not necessary for Python < 2.7
# TestResult.stopTestRun does not exist yet
# if orig_result is None:
# stopTestRun = getattr(result, 'stopTestRun', None)
# if stopTestRun is not None:
# stopTestRun()
def skipTest(self, reason):
"""Skip this test."""
raise SkipTest(reason)
if not hasattr(unittest.TestResult, 'addSkip'): # BBB: Python < 2.7
unittest.TestResult._orig_init__ = unittest.TestResult.__init__.im_func
def __init__(self):
self._orig_init__()
self.skipped = []
self.expectedFailures = []
self.unexpectedSuccesses = []
def addSkip(self, test, reason):
"""Called when a test is skipped."""
self.skipped.append((test, reason))
if self.showAll:
self.stream.writeln("skipped %s" % repr(reason))
elif self.dots:
self.stream.write("s")
self.stream.flush()
def addExpectedFailure(self, test, err):
"""Called when an expected failure/error occured."""
self.expectedFailures.append(
(test, self._exc_info_to_string(err, test)))
if self.showAll:
self.stream.writeln("expected failure")
elif self.dots:
self.stream.write("x")
self.stream.flush()
def addUnexpectedSuccess(self, test):
"""Called when a test was expected to fail, but succeed."""
self.unexpectedSuccesses.append(test)
if self.showAll:
self.stream.writeln("unexpected success")
elif self.dots:
self.stream.write("u")
self.stream.flush()
for f in __init__, addSkip, addExpectedFailure, addUnexpectedSuccess:
setattr(unittest.TestResult, f.__name__, f)
def getDescription(self, test):
doc_first_line = test.shortDescription()
if self.descriptions and doc_first_line:
return '\n'.join((str(test), doc_first_line))
else:
return str(test)
unittest._TextTestResult.getDescription = getDescription
class _TextTestResult(unittest._TextTestResult):
def wasSuccessful(self):
"Tells whether or not this result was a success"
return not (self.failures or self.errors or self.unexpectedSuccesses)
def printErrors(self):
if self.dots or self.showAll:
self.stream.writeln()
# 'None' correspond to redundant errors due to site creation errors,
# and we do not display them here.
self.printErrorList('ERROR', filter(None, self.errors))
self.printErrorList('FAIL', self.failures)
if self.unexpectedSuccesses:
self.stream.writeln(self.separator1)
for test in self.unexpectedSuccesses:
self.stream.writeln("SUCCESS: %s" % self.getDescription(test))
class TextTestRunner(unittest.TextTestRunner):
def _makeResult(self):
return _TextTestResult(self.stream, self.descriptions, self.verbosity)
def run(self, test):
result = self._makeResult()
startTime = time.time()
# BACK: 2.7 implementation wraps run with result.(start|stop)TestRun
try:
test(result)
except KeyboardInterrupt:
pass
stopTime = time.time()
timeTaken = stopTime - startTime
result.printErrors()
self.stream.writeln(result.separator2)
run = result.testsRun
self.stream.writeln("Ran %d test%s in %.3fs" %
(run, run != 1 and "s" or "", timeTaken))
self.stream.writeln()
results = map(len, (result.expectedFailures,
result.unexpectedSuccesses,
result.skipped))
expectedFails, unexpectedSuccesses, skipped = results
infos = []
if not result.wasSuccessful():
self.stream.write("FAILED")
failed, errored = map(len, (result.failures, result.errors))
if failed:
infos.append("failures=%d" % failed)
if errored:
infos.append("errors=%d" % errored)
else:
self.stream.write("OK")
if skipped:
infos.append("skipped=%d" % skipped)
if expectedFails:
infos.append("expected failures=%d" % expectedFails)
if unexpectedSuccesses:
infos.append("unexpected successes=%d" % unexpectedSuccesses)
if infos:
self.stream.writeln(" (%s)" % (", ".join(infos),))
else:
self.stream.write("\n")
return result
......@@ -30,22 +30,21 @@
import unittest
from os import environ, path
from ConfigParser import ConfigParser
from xmlrpclib import ServerProxy, Fault
from configparser import ConfigParser
from xmlrpc.client import ServerProxy, Fault
from magic import Magic
from base64 import encodestring, decodestring
from cloudooo.tests import backportUnittest
from base64 import encodebytes, decodebytes
config = ConfigParser()
def make_suite(test_case):
"""Function is used to run all tests together"""
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(test_case))
suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(test_case))
return suite
class TestCase(backportUnittest.TestCase):
class TestCase(unittest.TestCase):
def setUp(self):
server_cloudooo_conf = environ.get("server_cloudooo_conf", None)
......
......@@ -2,13 +2,13 @@
import sys
from pkg_resources import resource_filename
import logging
import unittest
from time import sleep
from subprocess import Popen
from ConfigParser import ConfigParser
from argparse import ArgumentParser
from os import chdir, path, environ, curdir, remove
from cloudooo.tests import backportUnittest
from glob import glob
import psutil
from signal import SIGQUIT
......@@ -23,10 +23,7 @@ def wait_use_port(pid, timeout_limit=30):
return False
def exit(msg):
sys.stderr.write(msg)
sys.exit(0)
logger = logging.getLogger(__name__)
def run():
description = "Unit Test Runner for Handlers"
......@@ -34,12 +31,18 @@ def run():
parser.add_argument('server_cloudooo_conf')
parser.add_argument('test_name')
parser.add_argument('--timeout_limit', dest='timeout_limit',
type=long, default=30,
type=int, default=30,
help="Timeout to waiting for the cloudooo stop")
parser.add_argument('--paster_path', dest='paster_path',
default='paster',
help="Path to Paster script")
parser.add_argument('-v', '--verbose', action='store_true', help='Enable logging')
parser.add_argument(
'-D', '--debug', action='store_true',
help='Enable pdb on errors/failures') # XXX but does not show test output
namespace = parser.parse_args()
if namespace.verbose:
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)-8s %(message)s',)
environment_path = glob(path.join(resource_filename("cloudooo", "handler"), '*', 'tests'))
sys.path.extend(environment_path)
server_cloudooo_conf = namespace.server_cloudooo_conf
......@@ -50,32 +53,38 @@ def run():
environ['server_cloudooo_conf'] = server_cloudooo_conf
paster_path = namespace.paster_path
python_extension = '.py'
if test_name[-3:] == python_extension:
test_name = test_name[:-3]
handler_path = None
for env_handler_path in environment_path:
full_path = path.join(env_handler_path, '%s%s' % (test_name,
python_extension))
if path.exists(full_path):
handler_path = env_handler_path
break
if handler_path is None:
exit("%s does not exists\n" % full_path)
from cloudooo.tests.handlerTestCase import startFakeEnvironment
from cloudooo.tests.handlerTestCase import stopFakeEnvironment
config = ConfigParser()
config.read(server_cloudooo_conf)
module = __import__(test_name)
if namespace.debug:
# XXX not really correct but enough to get a pdb prompt
suite = unittest.defaultTestLoader.loadTestsFromName(test_name)
module = __import__(list(suite)[0].__module__)
if module is unittest:
module = __import__(list(list(suite)[0])[0].__module__)
else:
module = __import__(test_name)
suite = unittest.defaultTestLoader.loadTestsFromModule(module)
handler_path = path.dirname(module.__file__)
DAEMON = getattr(module, 'DAEMON', False)
OPENOFFICE = getattr(module, 'OPENOFFICE', False)
TestRunner = backportUnittest.TextTestRunner
suite = unittest.defaultTestLoader.loadTestsFromModule(module)
def run_suite():
if namespace.debug:
import functools
suite.run = functools.partial(suite.run, debug=True)
try:
unittest.TextTestRunner(
verbosity=2,
warnings=None if sys.warnoptions else 'default',
).run(suite)
except:
import pdb; pdb.post_mortem()
raise
if DAEMON:
log_file = '%s/cloudooo_test.log' % config.get('app:main',
......@@ -85,20 +94,24 @@ def run():
command = [paster_path, 'serve', '--log-file', log_file,
server_cloudooo_conf]
process = Popen(command)
logger.debug("Started daemon %s", command)
wait_use_port(process.pid)
logger.debug("Daemon ready")
chdir(handler_path)
try:
TestRunner(verbosity=2).run(suite)
run_suite()
finally:
process.send_signal(SIGQUIT)
process.wait()
elif OPENOFFICE:
chdir(handler_path)
logger.debug("Starting fake environment")
startFakeEnvironment(conf_path=server_cloudooo_conf)
logger.debug("Fake environment ready")
try:
TestRunner(verbosity=2).run(suite)
run_suite()
finally:
stopFakeEnvironment()
else:
chdir(handler_path)
TestRunner(verbosity=2).run(suite)
run_suite()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment