Commit a4b9816a authored by Sebastien Robin's avatar Sebastien Robin

This is mostly a duplicate of the products/ERP5/bin/run_test_suite

script made by Julien.

There is following modifications :
- code is moved to ERP5Type/tests
- updater is not kept since the update of code will be
  managed at upper level (erp5 testnode)
- use argparse instead of optparse

runTestSuite then will be called by erp5 testnode or can
be called manually

git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@45579 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent 923c2b8b
No related merge requests found
import threading
class DummyTaskDistributionTool(object):
def __init__(self):
self.lock = threading.Lock()
def createTestResult(self, name, revision, test_name_list, allow_restart):
self.test_name_list = list(test_name_list)
return None, revision
def updateTestResult(self, name, revision, test_name_list):
self.test_name_list = list(test_name_list)
return None, revision
def startUnitTest(self, test_result_path, exclude_list=()):
self.lock.acquire()
try:
for i, test in enumerate(self.test_name_list):
if test not in exclude_list:
del self.test_name_list[i]
return None, test
finally:
self.lock.release()
def stopUnitTest(self, test_path, status_dict):
pass
\ No newline at end of file
import re, imp, sys, threading, os, shlex, subprocess, shutil
# The content of this file might be partially moved to an egg
# in order to allows parallel tests without the code of ERP5
_format_command_search = re.compile("[[\\s $({?*\\`#~';<>&|]").search
_format_command_escape = lambda s: "'%s'" % r"'\''".join(s.split("'"))
def format_command(*args, **kw):
cmdline = []
for k, v in sorted(kw.items()):
if _format_command_search(v):
v = _format_command_escape(v)
cmdline.append('%s=%s' % (k, v))
for v in args:
if _format_command_search(v):
v = _format_command_escape(v)
cmdline.append(v)
return ' '.join(cmdline)
def subprocess_capture(p, quiet=False):
def readerthread(input, output, buffer):
while True:
data = input.readline()
if not data:
break
output(data)
buffer.append(data)
if p.stdout:
stdout = []
output = quiet and (lambda data: None) or sys.stdout.write
stdout_thread = threading.Thread(target=readerthread,
args=(p.stdout, output, stdout))
stdout_thread.setDaemon(True)
stdout_thread.start()
if p.stderr:
stderr = []
stderr_thread = threading.Thread(target=readerthread,
args=(p.stderr, sys.stderr.write, stderr))
stderr_thread.setDaemon(True)
stderr_thread.start()
if p.stdout:
stdout_thread.join()
if p.stderr:
stderr_thread.join()
p.wait()
return (p.stdout and ''.join(stdout),
p.stderr and ''.join(stderr))
class Persistent(object):
"""Very simple persistent data storage for optimization purpose
This tool should become a standalone daemon communicating only with an ERP5
instance. But for the moment, it only execute 1 test suite and exists,
and test suite classes may want some information from previous runs.
"""
def __init__(self, filename):
self._filename = filename
def __getattr__(self, attr):
if attr == '_db':
try:
db = file(self._filename, 'r+')
except IOError, e:
if e.errno != errno.ENOENT:
raise
db = file(self._filename, 'w+')
else:
try:
self.__dict__.update(eval(db.read()))
except StandardError:
pass
self._db = db
return db
self._db
return super(Persistent, self).__getattribute__(attr)
def sync(self):
self._db.seek(0)
db = dict(x for x in self.__dict__.iteritems() if x[0][:1] != '_')
pprint.pprint(db, self._db)
self._db.truncate()
class TestSuite(object):
mysql_db_count = 1
allow_restart = False
realtime_output = True
stdin = file(os.devnull)
def __init__(self, max_instance_count, **kw):
self.__dict__.update(kw)
self._path_list = ['tests']
pool = threading.Semaphore(max_instance_count)
self.acquire = pool.acquire
self.release = pool.release
self._instance = threading.local()
self._pool = max_instance_count == 1 and [None] or \
range(1, max_instance_count + 1)
self._ready = set()
self.running = {}
if max_instance_count != 1:
self.realtime_output = False
elif os.isatty(1):
self.realtime_output = True
self.persistent = Persistent('run_test_suite-%s.tmp'
% self.__class__.__name__)
instance = property(lambda self: self._instance.id)
def start(self, test, on_stop=None):
assert test not in self.running
self.running[test] = instance = self._pool.pop(0)
def run():
self._instance.id = instance
if instance not in self._ready:
self._ready.add(instance)
self.setup()
status_dict = self.run(test)
if on_stop is not None:
on_stop(status_dict)
self._pool.append(self.running.pop(test))
self.release()
thread = threading.Thread(target=run)
thread.setDaemon(True)
thread.start()
def update(self):
self.checkout() # by default, update everything
def setup(self):
pass
def run(self, test):
raise NotImplementedError
def getTestList(self):
raise NotImplementedError
def spawn(self, *args, **kw):
quiet = kw.pop('quiet', False)
env = kw and dict(os.environ, **kw) or None
command = format_command(*args, **kw)
print '\n$ ' + command
sys.stdout.flush()
p = subprocess.Popen(args, stdin=self.stdin, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, env=env)
if self.realtime_output:
stdout, stderr = subprocess_capture(p, quiet)
else:
stdout, stderr = p.communicate()
if not quiet:
sys.stdout.write(stdout)
sys.stderr.write(stderr)
result = dict(status_code=p.returncode, command=command,
stdout=stdout, stderr=stderr)
if p.returncode:
raise SubprocessError(result)
return result
class ERP5TypeTestSuite(TestSuite):
RUN_RE = re.compile(
r'Ran (?P<all_tests>\d+) tests? in (?P<seconds>\d+\.\d+)s',
re.DOTALL)
STATUS_RE = re.compile(r"""
(OK|FAILED)\s+\(
(failures=(?P<failures>\d+),?\s*)?
(errors=(?P<errors>\d+),?\s*)?
(skipped=(?P<skips>\d+),?\s*)?
(expected\s+failures=(?P<expected_failures>\d+),?\s*)?
(unexpected\s+successes=(?P<unexpected_successes>\d+),?\s*)?
\)
""", re.DOTALL | re.VERBOSE)
def setup(self):
instance_home = self.instance and 'unit_test.%u' % self.instance \
or 'unit_test'
tests = os.path.join(instance_home, 'tests')
if os.path.exists(tests):
shutil.rmtree(instance_home + '.previous', True)
shutil.move(tests, instance_home + '.previous')
def run(self, test):
return self.runUnitTest(test)
def runUnitTest(self, *args, **kw):
if self.instance:
args = ('--instance_home=unit_test.%u' % self.instance,) + args
instance_number = self.instance or 1
mysql_db_list = self.mysql_db_list[
(instance_number-1) * self.mysql_db_count:
(instance_number) * self.mysql_db_count]
if len(mysql_db_list) > 1:
kw['extra_sql_connection_string_list'] = ','.join(mysql_db_list[1:])
try:
runUnitTest = os.environ.get('RUN_UNIT_TEST',
'runUnitTest')
args = tuple(shlex.split(runUnitTest)) \
+ ('--verbose', '--erp5_sql_connection_string=' + mysql_db_list[0]) \
+ args
status_dict = self.spawn(*args, **kw)
except SubprocessError, e:
status_dict = e.status_dict
test_log = status_dict['stderr']
search = self.RUN_RE.search(test_log)
if search:
groupdict = search.groupdict()
status_dict.update(duration=float(groupdict['seconds']),
test_count=int(groupdict['all_tests']))
search = self.STATUS_RE.search(test_log)
if search:
groupdict = search.groupdict()
status_dict.update(error_count=int(groupdict['errors'] or 0),
failure_count=int(groupdict['failures'] or 0),
skip_count=int(groupdict['skips'] or 0)
+int(groupdict['expected_failures'] or 0)
+int(groupdict['unexpected_successes'] or 0))
return status_dict
class SubprocessError(EnvironmentError):
def __init__(self, status_dict):
self.status_dict = status_dict
def __getattr__(self, name):
return self.status_dict[name]
def __str__(self):
return 'Error %i' % self.status_code
sys.modules['test_suite'] = module = imp.new_module('test_suite')
for var in SubprocessError, TestSuite, ERP5TypeTestSuite:
setattr(module, var.__name__, var)
\ No newline at end of file
#!/usr/bin/python2.6
import argparse, pprint, socket, sys, time, xmlrpclib
from DummyTaskDistributionTool import DummyTaskDistributionTool
from ERP5TypeTestSuite import ERP5TypeTestSuite
def makeSuite(node_quantity=None, test_suite=None, revision=None,
db_list=None):
for k in sys.modules.keys():
if k == 'tests' or k.startswith('tests.'):
del sys.modules[k]
module_name, class_name = ('tests.' + \
test_suite).rsplit('.', 1)
try:
suite_class = getattr(__import__(module_name, None, None, [class_name]),
class_name)
except (AttributeError, ImportError):
raise
suite = suite_class(revision=revision,
max_instance_count=node_quantity,
mysql_db_list=db_list.split(','),
)
return suite
def safeRpcCall(function, *args):
retry = 64
while True:
try:
return function(*args)
except (socket.error, xmlrpclib.ProtocolError), e:
print >>sys.stderr, e
pprint.pprint(args, file(function._Method__name, 'w'))
time.sleep(retry)
retry += retry >> 1
def main():
parser = argparse.ArgumentParser(description='Run a test suite.')
parser.add_argument('--test_suite', help='The test suite name')
parser.add_argument('--revision', help='The revision to test',
default='dummy_revision')
parser.add_argument('--node_quantity', help='Number of parallel tests to run',
default=1, type=int)
parser.add_argument('--master_url',
help='The Url of Master controling many suites',
default=None)
parser.add_argument('--db_list', help='A list of sql connection strings')
# parameters that needs to be passed to runUnitTest
parser.add_argument('--conversion_server_hostname', default=None)
parser.add_argument('--conversion_server_port', default=None)
parser.add_argument('--volatile_memcached_server_hostname', default=None)
parser.add_argument('--volatile_memcached_server_port', default=None)
parser.add_argument('--persistent_memcached_server_hostname', default=None)
parser.add_argument('--persistent_memcached_server_port', default=None)
args = parser.parse_args()
if args.master_url is not None:
master_url = args.master_url
if master_url[-1] != '/':
master_url += '/'
master = xmlrpclib.ServerProxy("%s%s" %
(master_url, 'portal_task_distribution'),
allow_none=1)
assert master.getProtocolRevision() == 1
else:
master = DummyTaskDistributionTool()
revision = args.revision
if ',' in revision:
revision = revision.split(',')
suite = makeSuite(test_suite=args.test_suite,
node_quantity=args.node_quantity,
revision=revision,
db_list=args.db_list)
test_result = safeRpcCall(master.createTestResult,
args.test_suite, revision, suite.getTestList(),
suite.allow_restart)
if test_result:
test_result_path, test_revision = test_result
while suite.acquire():
test = safeRpcCall(master.startUnitTest, test_result_path,
suite.running.keys())
if test:
suite.start(test[1], lambda status_dict, __test_path=test[0]:
safeRpcCall(master.stopUnitTest, __test_path, status_dict))
elif not suite.running:
break
\ No newline at end of file
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment