Commit c39caeaa authored by Julien Muchembled's avatar Julien Muchembled

testnode: more code clean up

parent 9669729e
import unittest
from unittest import TestCase
from erp5.util.testnode.testnode import TestNode
from erp5.util.testnode.testnode import TestNode, test_type_registry
from erp5.util.testnode.NodeTestSuite import SlapOSInstance, NodeTestSuite
from erp5.util.testnode.ProcessManager import ProcessManager, SubprocessError
from erp5.util.testnode.Updater import Updater
from erp5.util.testnode.SlapOSMasterCommunicator import SlapOSMasterCommunicator
from erp5.util.testnode.SlapOSControler import SlapOSControler
from erp5.util.testnode.UnitTestRunner import UnitTestRunner
from erp5.util.testnode.ScalabilityTestRunner import ScalabilityTestRunner
from erp5.util.testnode.SlapOSControler import createFolder
from erp5.util.taskdistribution import TaskDistributor
......@@ -56,14 +54,6 @@ class ERP5TestNode(TestCase):
print "TESTNODE LOG : %r, %r" % (arg, kw)
self.log = log
def returnGoodClassRunner(self, test_type):
if test_type == 'UnitTest':
return UnitTestRunner
elif test_type == 'ScalabilityTest':
return ScalabilityTestRunner
else:
raise NotImplementedError
def tearDown(self):
shutil.rmtree(self._temp_dir, True)
......@@ -127,7 +117,7 @@ class ERP5TestNode(TestCase):
"""
Update from zero/Regenerate the testsuite
"""
node_test_suite.edit(working_directory=self.working_directory,
node_test_suite.edit(
**self.getTestSuiteData(add_third_repository=add_third_repository,
add_broken_repository=add_broken_repository)[0])
......@@ -188,8 +178,9 @@ class ERP5TestNode(TestCase):
node_test_suite = test_node.getNodeTestSuite('foo')
self.assertEquals(0, node_test_suite.retry_software_count)
node_test_suite.retry_software_count = 2
self.assertIs(node_test_suite, test_node.getNodeTestSuite('foo'))
self.assertEquals(2, node_test_suite.retry_software_count)
node_test_suite = test_node.delNodeTestSuite('foo')
del test_node.node_test_suite_dict['foo']
node_test_suite = test_node.getNodeTestSuite('foo')
self.assertEquals(0, node_test_suite.retry_software_count)
......@@ -199,7 +190,6 @@ class ERP5TestNode(TestCase):
"""
test_node = self.getTestNode()
node_test_suite = test_node.getNodeTestSuite('foo')
node_test_suite.edit(working_directory=self.working_directory)
self.assertEquals("%s/foo" % self.working_directory,
node_test_suite.working_directory)
self.assertEquals("%s/foo/test_suite" % self.working_directory,
......@@ -471,16 +461,16 @@ shared = true
test_node = self.getTestNode()
test_suite_data = self.getTestSuiteData(add_third_repository=True)
self.assertEquals([], os.listdir(self.working_directory))
test_node.checkOldTestSuite(test_suite_data)
test_node.purgeOldTestSuite(test_suite_data)
self.assertEquals([], os.listdir(self.working_directory))
os.mkdir(os.path.join(self.working_directory, 'foo'))
self.assertEquals(['foo'], os.listdir(self.working_directory))
test_node.checkOldTestSuite(test_suite_data)
test_node.purgeOldTestSuite(test_suite_data)
self.assertEquals(['foo'], os.listdir(self.working_directory))
os.mkdir(os.path.join(self.working_directory, 'bar'))
self.assertEquals(set(['bar','foo']),
set(os.listdir(self.working_directory)))
test_node.checkOldTestSuite(test_suite_data)
test_node.purgeOldTestSuite(test_suite_data)
self.assertEquals(['foo'], os.listdir(self.working_directory))
def test_09_runTestSuite(self, my_test_type='UnitTest'):
......@@ -499,15 +489,13 @@ shared = true
test_node = self.getTestNode()
test_node.process_manager.spawn = spawn
RunnerClass = self.returnGoodClassRunner(my_test_type)
runner = self.returnGoodClassRunner(my_test_type)(test_node)
slapos_controler = runner._getSlapOSControler(self.working_directory)
runner = test_type_registry[my_test_type](test_node)
# Create and initialise/regenerate a nodetestsuite
node_test_suite = test_node.getNodeTestSuite('foo')
self.updateNodeTestSuiteData(node_test_suite)
node_test_suite.revision_list = ('dummy', (0, '')),
path = slapos_controler.instance_root + '/a/bin/runTestSuite'
path = runner.getInstanceRoot(node_test_suite) + '/a/bin/runTestSuite'
os.makedirs(os.path.dirname(path))
os.close(os.open(path, os.O_CREAT))
......@@ -542,11 +530,9 @@ shared = true
def test_10_prepareSlapOS(self, my_test_type='UnitTest'):
test_node = self.getTestNode()
test_node_slapos = SlapOSInstance()
RunnerClass = self.returnGoodClassRunner(my_test_type)
runner = RunnerClass(test_node)
test_node_slapos = SlapOSInstance(self.slapos_directory)
runner = test_type_registry[my_test_type](test_node)
node_test_suite = test_node.getNodeTestSuite('foo')
node_test_suite.edit(working_directory=self.working_directory)
status_dict = {"status_code" : 0}
global call_list
call_list = []
......@@ -660,7 +646,7 @@ shared = true
original_sleep = time.sleep
time.sleep = doNothing
self.generateTestRepositoryList()
RunnerClass = self.returnGoodClassRunner(my_test_type)
RunnerClass = test_type_registry[my_test_type]
# Patch
if my_test_type == "ScalabilityTest":
original_getSlaposAccountKey = TaskDistributor.getSlaposAccountKey
......@@ -742,7 +728,6 @@ shared = true
def test_14_createFolder(self):
test_node = self.getTestNode()
node_test_suite = test_node.getNodeTestSuite('foo')
node_test_suite.edit(working_directory=self.working_directory)
folder = node_test_suite.test_suite_directory
self.assertEquals(False, os.path.exists(folder))
createFolder(folder)
......@@ -818,7 +803,7 @@ shared = true
self.assertEquals(1, len([x for x in suite_log.readlines() \
if x.find("Activated logfile")>=0]))
RunnerClass = self.returnGoodClassRunner(my_test_type)
RunnerClass = test_type_registry[my_test_type]
original_sleep = time.sleep
time.sleep = doNothing
self.generateTestRepositoryList()
......@@ -938,8 +923,7 @@ shared = true
SlapOSControler.initializeSlapOSControler
initial_runSoftwareRelease = SlapOSControler.runSoftwareRelease
test_node = self.getTestNode()
RunnerClass = self.returnGoodClassRunner(my_test_type)
runner = RunnerClass(test_node)
runner = test_type_registry[my_test_type](test_node)
node_test_suite = test_node.getNodeTestSuite('foo')
init_call_kw_list = []
def initializeSlapOSControler(self, **kw):
......@@ -1045,7 +1029,7 @@ shared = true
os.makedirs(test_result_path_root)
self.generateTestRepositoryList()
# Select the good runner to modify
RunnerClass = self.returnGoodClassRunner('ScalabilityTest')
RunnerClass = test_type_registry['ScalabilityTest']
# Patch methods
original_sleep = time.sleep
original_getSlaposAccountKey = TaskDistributor.getSlaposAccountKey
......
......@@ -37,38 +37,28 @@ class SlapOSInstance(object):
Base of an software instance,
store variables used during software installation
"""
def __init__(self):
def __init__(self, working_directory):
self.retry_software_count = 0
self.retry = False
self.working_directory = working_directory
def edit(self, **kw):
self.__dict__.update(**kw)
self._checkData()
def _checkData(self):
pass
class NodeTestSuite(SlapOSInstance):
"""
"""
def __init__(self, reference):
super(NodeTestSuite, self).__init__()
def __init__(self, reference, working_directory):
d = os.path.join(working_directory, reference)
super(NodeTestSuite, self).__init__(d)
self.reference = reference
self.cluster_configuration = {}
self.test_suite_directory = os.path.join(d, 'test_suite')
self.custom_profile_path = os.path.join(d, 'software.cfg')
createFolder(d)
def _checkData(self):
if getattr(self, "working_directory", None) is not None:
if not(self.working_directory.endswith(os.path.sep + self.reference)):
self.working_directory = os.path.join(self.working_directory,
self.reference)
createFolder(self.working_directory)
self.test_suite_directory = os.path.join(
self.working_directory, "test_suite")
self.custom_profile_path = os.path.join(self.working_directory,
'software.cfg')
if getattr(self, "vcs_repository_list", None) is not None:
for vcs_repository in self.vcs_repository_list:
def edit(self, **kw):
self.__dict__.update(**kw)
for vcs_repository in kw.get('vcs_repository_list', ()):
buildout_section_id = vcs_repository.get('buildout_section_id')
repository_id = buildout_section_id or \
vcs_repository.get('url').split('/')[-1].split('.')[0]
......
......@@ -99,12 +99,6 @@ class SlapOSControler(object):
else:
raise ValueError("Configuration file not found.")
def getInstanceRequestedState(self, reference):
try:
return self.instance_config[reference]['requested_state']
except Exception:
raise ValueError("Instance '%s' not exist" %self.instance_config[reference])
def request(self, reference, software_url, software_type=None,
software_configuration=None, computer_guid=None, state='started'):
"""
......@@ -219,7 +213,7 @@ class SlapOSControler(object):
self.software_path_list = software_path_list
self.log('SlapOSControler, initialize, reset_software: %r', reset_software)
config = self.config
slapos_config_dict = self.config.copy()
slapos_config_dict = config.copy()
slapos_config_dict.update(software_root=self.software_root,
instance_root=self.instance_root,
proxy_database=self.proxy_database)
......
......@@ -102,7 +102,7 @@ class UnitTestRunner(object):
# report-url, report-project and suite-url are required to seleniumrunner
# instance. This is a hack which must be removed.
config = self.testnode.config
return self._prepareSlapOS(config['slapos_directory'],
return self._prepareSlapOS(test_node_slapos.working_directory,
test_node_slapos, self.testnode.log, create_partition=0,
software_path_list=config.get("software_list"),
cluster_configuration={
......@@ -120,14 +120,18 @@ class UnitTestRunner(object):
software_path_list=[node_test_suite.custom_profile_path],
cluster_configuration={'_': json.dumps(node_test_suite.cluster_configuration)})
def getInstanceRoot(self, node_test_suite):
return self._getSlapOSControler(
node_test_suite.working_directory).instance_root
def runTestSuite(self, node_test_suite, portal_url, log=None):
config = self.testnode.config
slapos_controler = self._getSlapOSControler(self.testnode.working_directory)
run_test_suite_path_list = sorted(glob.glob("%s/*/bin/runTestSuite" % \
slapos_controler.instance_root))
if not len(run_test_suite_path_list):
run_test_suite_path_list = glob.glob(
self.getInstanceRoot(node_test_suite) + "/*/bin/runTestSuite")
try:
run_test_suite_path = min(run_test_suite_path_list)
except ValueError:
raise ValueError('No runTestSuite provided in installed partitions.')
run_test_suite_path = run_test_suite_path_list[0]
# Deal with Shebang size limitation
invocation_list = dealShebang(run_test_suite_path)
invocation_list += (run_test_suite_path,
......
......@@ -29,14 +29,8 @@ import argparse
import logging
import logging.handlers
import os
import pkg_resources
from testnode import TestNode
CONFIG = {
'computer_id': 'COMPUTER',
'partition_reference': 'test0',
}
from .testnode import TestNode
def main(*args):
parser = argparse.ArgumentParser()
......@@ -54,6 +48,10 @@ def main(*args):
logging.basicConfig(level=logging.INFO,
format=logger_format)
logger = logging.getLogger('erp5testnode')
CONFIG = {
'logger': logger.info,
'partition_reference': 'test0',
}
if parsed_argument.console or parsed_argument.logfile:
if parsed_argument.console:
logger.addHandler(logging.StreamHandler())
......@@ -64,11 +62,10 @@ def main(*args):
maxBytes=20000000, backupCount=4)
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
logger.info('Activated logfile %r output' % parsed_argument.logfile)
logger.info('Activated logfile %r output', parsed_argument.logfile)
CONFIG['log_file'] = parsed_argument.logfile
else:
logger.addHandler(logging.NullHandler())
CONFIG['logger'] = logger.info
config = ConfigParser.SafeConfigParser()
# do not change case of option keys
config.optionxform = str
......
......@@ -53,6 +53,11 @@ class DummyLogger(object):
'critical', 'fatal'):
setattr(self, name, func)
test_type_registry = {
'UnitTest': UnitTestRunner,
'ScalabilityTest': ScalabilityTestRunner,
}
class TestNode(object):
def __init__(self, log, config, max_log_time=MAX_LOG_TIME,
......@@ -68,36 +73,29 @@ class TestNode(object):
self.max_temp_time = max_temp_time
self.url_access = "https://[0::0]:0123" # Ipv6 + port of the node
def checkOldTestSuite(self,test_suite_data):
installed_reference_set = set(os.listdir(self.working_directory))
wished_reference_set = set([x['test_suite_reference'] for x in test_suite_data])
to_remove_reference_set = installed_reference_set.difference(
wished_reference_set)
for y in to_remove_reference_set:
fpath = os.path.join(self.working_directory,y)
self.delNodeTestSuite(y)
self.log("testnode.checkOldTestSuite, DELETING : %r", fpath)
def purgeOldTestSuite(self, test_suite_data):
reference_set = set(os.listdir(self.working_directory))
reference_set.difference_update(x['test_suite_reference']
for x in test_suite_data)
for reference in reference_set:
fpath = os.path.join(self.working_directory, reference)
self.node_test_suite_dict.pop(reference, None)
self.log("testnode.purgeOldTestSuite, DELETING : %r", fpath)
if os.path.isdir(fpath):
shutil.rmtree(fpath)
else:
os.remove(fpath)
def getNodeTestSuite(self, reference):
node_test_suite = self.node_test_suite_dict.get(reference)
if node_test_suite is None:
node_test_suite = NodeTestSuite(reference)
self.node_test_suite_dict[reference] = node_test_suite
node_test_suite.edit(
log=self.log,
config=self.config,
process_manager=self.process_manager)
try:
node_test_suite = self.node_test_suite_dict[reference]
except KeyError:
node_test_suite = self.node_test_suite_dict[reference] = \
NodeTestSuite(reference, self.working_directory)
config = self.config
node_test_suite.edit(log_directory=config['log_directory'])
return node_test_suite
def delNodeTestSuite(self, reference):
self.node_test_suite_dict.pop(reference, None)
def constructProfile(self, node_test_suite, test_type, use_relative_path=False):
assert len(node_test_suite.vcs_repository_list), "we must have at least one repository"
software_config_path = None
......@@ -129,6 +127,10 @@ class TestNode(object):
node_test_suite.reference)
repository_path = os.path.relpath(repository_path, from_path)
# XXX: Like in run(), code depending on specific test type must be
# moved to the test type classes. In particular, the use of a
# replacement pattern ('<obfuscated_url>') is ugly: buildout
# has cleaner ways to do that.
if test_type=="ScalabilityTest":
# <obfuscated_url> word is modified by in runner.prepareSlapOSForTestSuite()
profile_content_list.append("""
......@@ -159,7 +161,6 @@ shared = true
f.write("[buildout]\nextends = %s\n%s" % (
software_config_path,
''.join(profile_content_list)))
sys.path.append(repository_path)
def updateRevisionList(self, node_test_suite):
config = self.config
......@@ -195,12 +196,9 @@ shared = true
test_result.reportStatus('LOG url', "%s/%s" % (self.config.get('httpd_url'),
folder_id), '')
self.log("going to switch to log %r", suite_log_path)
self.process_manager.log = self.log = self.getSuiteLog()
self.process_manager.log = self.log = self.suite_log
return suite_log_path
def getSuiteLog(self):
return self.suite_log
def _initializeSuiteLog(self, suite_log_path):
# remove previous handlers
logger = logging.getLogger('testsuite')
......@@ -281,8 +279,8 @@ shared = true
def run(self):
log = self.log
config = self.config
test_node_slapos = SlapOSInstance()
test_node_slapos.edit(working_directory=config['slapos_directory'])
portal_url = config['test_suite_master_url']
test_node_slapos = SlapOSInstance(config['slapos_directory'])
try:
while True:
test_result = None
......@@ -291,12 +289,12 @@ shared = true
self.log = self.process_manager.log = self.testnode_log
self.cleanUp()
begin = time.time()
portal_url = config['test_suite_master_url']
self.taskdistribution = taskdistribution.TaskDistributor(
portal_url,
logger=DummyLogger(log))
node_configuration = self.taskdistribution.subscribeNode(node_title=config['test_node_title'],
computer_guid=config['computer_id'])
taskdistributor = taskdistribution.TaskDistributor(
portal_url, logger=DummyLogger(log))
self.test_suite_portal = taskdistributor # XXX ScalabilityTest
node_configuration = taskdistributor.subscribeNode(
node_title=config['test_node_title'],
computer_guid=config['computer_id'])
if type(node_configuration) is str:
# Backward compatiblity
node_configuration = json.loads(node_configuration)
......@@ -307,9 +305,9 @@ shared = true
log('Received and using process timeout from master: %i',
process_timeout)
self.process_manager.max_timeout = process_timeout
test_suite_data = self.taskdistribution.startTestSuite(
node_title=config['test_node_title'],
computer_guid=config['computer_id'])
test_suite_data = taskdistributor.startTestSuite(
node_title=config['test_node_title'],
computer_guid=config['computer_id'])
if type(test_suite_data) is str:
# Backward compatiblity
test_suite_data = json.loads(test_suite_data)
......@@ -317,40 +315,27 @@ shared = true
log("Got following test suite data from master : %r",
test_suite_data)
try:
my_test_type = self.taskdistribution.getTestType()
my_test_type = taskdistributor.getTestType()
except Exception:
log("testnode, error during requesting getTestType() method"
" from the distributor.")
raise
# Select runner according to the test type
if my_test_type == 'UnitTest':
runner = UnitTestRunner(self)
elif my_test_type == 'ScalabilityTest':
runner = ScalabilityTestRunner(self)
else:
try:
runner_class = test_type_registry[my_test_type]
except KeyError:
log("testnode, Runner type %s not implemented.", my_test_type)
raise NotImplementedError
runner = runner_class(self)
log("Type of current test is %s", my_test_type)
# master testnode gets test_suites, slaves get nothing
runner.prepareSlapOSForTestNode(test_node_slapos)
# Clean-up test suites
self.checkOldTestSuite(test_suite_data)
self.purgeOldTestSuite(test_suite_data)
for test_suite in test_suite_data:
node_test_suite = self.getNodeTestSuite(
test_suite["test_suite_reference"])
node_test_suite.edit(
working_directory=config['working_directory'],
log_directory=config['log_directory'])
test_suite.pop("test_suite_reference"))
node_test_suite.edit(**test_suite)
if my_test_type == 'UnitTest':
runner = UnitTestRunner(node_test_suite)
elif my_test_type == 'ScalabilityTest':
runner = ScalabilityTestRunner(self)
else:
log("testnode, Runner type %s not implemented.", my_test_type)
raise NotImplementedError
# XXX: temporary hack to prevent empty test_suite
if not hasattr(node_test_suite, 'test_suite'):
......@@ -359,11 +344,7 @@ shared = true
self.process_manager.killPreviousRun()
if not self.updateRevisionList(node_test_suite):
continue
# Write our own software.cfg to use the local repository
self.constructProfile(node_test_suite, my_test_type,
runner.getRelativePathUsage())
# Make sure we have local repository
test_result = self.taskdistribution.createTestResult(
test_result = taskdistributor.createTestResult(
node_test_suite.revision, [],
config['test_node_title'], False,
node_test_suite.test_suite_title,
......@@ -375,17 +356,23 @@ shared = true
node_test_suite.edit(test_result=test_result)
# get cluster configuration for this test suite, this is needed to
# know slapos parameters to user for creating instances
log("Getting configuration from test suite %s", node_test_suite.test_suite_title)
generated_config = self.taskdistribution.generateConfiguration(node_test_suite.test_suite_title)
log("Getting configuration from test suite %s",
node_test_suite.test_suite_title)
generated_config = taskdistributor.generateConfiguration(
node_test_suite.test_suite_title)
json_data = json.loads(generated_config)
cluster_configuration = deunicodeData(json_data['configuration_list'][0])
node_test_suite.edit(cluster_configuration=cluster_configuration)
# Now prepare the installation of SlapOS and create instance
self.constructProfile(node_test_suite, my_test_type,
runner.getRelativePathUsage())
status_dict = runner.prepareSlapOSForTestSuite(node_test_suite)
# Give some time so computer partitions may start
# as partitions can be of any kind we have and likely will never have
# a reliable way to check if they are up or not ...
time.sleep(20)
# XXX: Do not switch according to the test type. IOW, the
# following code must be moved to the test type class.
if my_test_type == 'UnitTest':
runner.runTestSuite(node_test_suite, portal_url)
elif my_test_type == 'ScalabilityTest':
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment