Commit 011f9128 authored by Julien Muchembled's avatar Julien Muchembled Committed by Klaus Wölfel

testnode: simplify logging

Conflicts:
	erp5/util/testnode/ProcessManager.py
parent d9a5ecd8
import unittest
from unittest import TestCase
from contextlib import contextmanager
from erp5.util.testnode import logger
from erp5.util.testnode.testnode import TestNode, test_type_registry
from erp5.util.testnode.NodeTestSuite import SlapOSInstance, NodeTestSuite
from erp5.util.testnode.ProcessManager import ProcessManager, SubprocessError
......@@ -12,17 +14,27 @@ from erp5.util.testnode.SlapOSControler import createFolder
from erp5.util.taskdistribution import TaskDistributor
from erp5.util.taskdistribution import TestResultProxy
import argparse
import logging
import os
import shutil
import subprocess
import sys
import tempfile
import json
import time
import types
import re
@contextmanager
def dummySuiteLog(_):
yield
class ERP5TestNode(TestCase):
_handler = logging.StreamHandler(sys.stdout)
_handler.setFormatter(logging.Formatter('TESTNODE LOG: %(message)s'))
def setUp(self):
self._temp_dir = tempfile.mkdtemp()
self.working_directory = os.path.join(self._temp_dir, 'testnode')
......@@ -48,13 +60,11 @@ class ERP5TestNode(TestCase):
os.mkdir(self.remote_repository0)
os.mkdir(self.remote_repository1)
os.mkdir(self.remote_repository2)
def log(*args,**kw):
for arg in args:
print "TESTNODE LOG : %r, %r" % (arg, kw)
self.log = log
logging.getLogger().addHandler(self._handler)
def tearDown(self):
shutil.rmtree(self._temp_dir, True)
logging.getLogger().removeHandler(self._handler)
def getTestNode(self):
# XXX how to get property the git path ?
......@@ -79,7 +89,12 @@ class ERP5TestNode(TestCase):
config["frontend_url"] = "http://frontend/"
config["software_list"] = ["foo", "bar"]
return TestNode(self.log, config)
testnode = TestNode(config)
# By default, keep suite logs to stdout for easier debugging
# (stdout/stderr are automatically reported to ERP5).
# This is unset by test methods that check normal suite logging.
testnode.suiteLog = dummySuiteLog
return testnode
def getTestSuiteData(self, add_third_repository=False,
add_broken_repository=False, reference="foo"):
......@@ -219,7 +234,8 @@ class ERP5TestNode(TestCase):
test_node.test_suite_portal.getTestNode = TaskDistributor.getTestType
node_test_suite = test_node.getNodeTestSuite('foo')
self.updateNodeTestSuiteData(node_test_suite, add_third_repository=True)
node_test_suite.revision = 'rep1=1234-azerty,rep2=3456-qwerty'
node_test_suite.revision_list = (('rep1', (1234, 'azerty')),
('rep2', (3456, 'qwerty')))
test_node.constructProfile(node_test_suite,my_test_type)
self.assertEquals("%s/software.cfg" % (node_test_suite.working_directory,),
node_test_suite.custom_profile_path)
......@@ -234,12 +250,14 @@ repository = %(temp_dir)s/testnode/foo/rep1
branch = master
revision =
develop = false
shared = true
[rep2]
repository = %(temp_dir)s/testnode/foo/rep2
branch = foo
revision =
develop = false
shared = true
""" % {'temp_dir': self._temp_dir}
else:
revision1 = "azerty"
......@@ -253,17 +271,23 @@ repository = <obfuscated_url>/rep1/rep1.git
revision = %(revision1)s
ignore-ssl-certificate = true
develop = false
shared = true
[rep2]
repository = <obfuscated_url>/rep2/rep2.git
revision = %(revision2)s
ignore-ssl-certificate = true
develop = false
shared = true
""" % {'temp_dir': self._temp_dir, 'revision1': revision1, 'revision2': revision2}
self.assertEquals(expected_profile, profile.read())
profile.close()
def test_05_getAndUpdateFullRevisionList(self):
def getAndUpdateFullRevisionList(self, test_node, node_test_suite):
if test_node.updateRevisionList(node_test_suite):
return node_test_suite.revision.split(',')
def test_05_updateRevisionList(self):
"""
Check if we clone correctly repositories and get right revisions
"""
......@@ -271,7 +295,7 @@ develop = false
test_node = self.getTestNode()
node_test_suite = test_node.getNodeTestSuite('foo')
self.updateNodeTestSuiteData(node_test_suite)
rev_list = test_node.getAndUpdateFullRevisionList(node_test_suite)
rev_list = self.getAndUpdateFullRevisionList(test_node, node_test_suite)
self.assertEquals(2, len(rev_list))
self.assertEquals(rev_list[0], 'rep0=2-%s' % commit_dict['rep0'][0][0])
self.assertEquals(rev_list[1], 'rep1=2-%s' % commit_dict['rep1'][0][0])
......@@ -280,7 +304,7 @@ develop = false
my_file.close()
call = self.getCaller(cwd=self.remote_repository1)
call("git commit -av -m new_commit".split())
rev_list = test_node.getAndUpdateFullRevisionList(node_test_suite)
rev_list = self.getAndUpdateFullRevisionList(test_node, node_test_suite)
self.assertTrue(rev_list[0].startswith('rep0=2-'))
self.assertTrue(rev_list[1].startswith('rep1=3-'))
self.assertEquals(2, len(node_test_suite.vcs_repository_list))
......@@ -296,7 +320,7 @@ develop = false
test_node = self.getTestNode()
node_test_suite = test_node.getNodeTestSuite('foo')
self.updateNodeTestSuiteData(node_test_suite, add_third_repository=True)
rev_list = test_node.getAndUpdateFullRevisionList(node_test_suite)
rev_list = self.getAndUpdateFullRevisionList(test_node, node_test_suite)
self.assertEquals(3, len(rev_list))
self.assertEquals(3, len(node_test_suite.vcs_repository_list))
rep2_clone_path = [x['repository_path'] for x in \
......@@ -310,7 +334,7 @@ develop = false
self.assertEquals(vcs_repository_info['branch'], 'foo')
# change it to master
vcs_repository_info['branch'] = 'master'
rev_list = test_node.getAndUpdateFullRevisionList(node_test_suite)
rev_list = self.getAndUpdateFullRevisionList(test_node, node_test_suite)
output = call("git branch".split()).strip()
print output
self.assertTrue("* master" in output.split('\n'))
......@@ -318,7 +342,7 @@ develop = false
remote_call = self.getCaller(cwd=self.remote_repository2)
output = remote_call('git checkout master -b bar'.split())
vcs_repository_info['branch'] = 'bar'
rev_list = test_node.getAndUpdateFullRevisionList(node_test_suite)
rev_list = self.getAndUpdateFullRevisionList(test_node, node_test_suite)
output = call("git branch".split()).strip()
self.assertTrue("* bar" in output.split('\n'))
# Add a fourth branch on remote, make sure we could switch to it
......@@ -341,7 +365,7 @@ develop = false
test_node = self.getTestNode()
node_test_suite = test_node.getNodeTestSuite('foo')
self.updateNodeTestSuiteData(node_test_suite)
rev_list = test_node.getAndUpdateFullRevisionList(node_test_suite)
rev_list = self.getAndUpdateFullRevisionList(test_node, node_test_suite)
self.assertEquals(2, len(rev_list))
self.assertEquals(2, len(node_test_suite.vcs_repository_list))
# patch deleteRepository to make sure it will be called once for the wrong
......@@ -362,7 +386,7 @@ develop = false
call = self.getCaller(cwd=rep0_clone_path)
self.assertEquals(call("git config --get remote.origin.url".split()).strip(),
self.remote_repository0)
rev_list = test_node.getAndUpdateFullRevisionList(node_test_suite)
rev_list = self.getAndUpdateFullRevisionList(test_node, node_test_suite)
self.assertEquals(call("git config --get remote.origin.url".split()).strip(),
self.remote_repository2)
self.assertEquals([rep0_clone_path], deleted_repository_path_list)
......@@ -380,7 +404,7 @@ develop = false
test_node = self.getTestNode()
node_test_suite = test_node.getNodeTestSuite('foo')
self.updateNodeTestSuiteData(node_test_suite)
rev_list = test_node.getAndUpdateFullRevisionList(node_test_suite)
rev_list = self.getAndUpdateFullRevisionList(test_node, node_test_suite)
self.assertEquals(2, len(rev_list))
self.assertEquals(2, len(node_test_suite.vcs_repository_list))
rep0_clone_path = [x['repository_path'] for x in \
......@@ -390,7 +414,7 @@ develop = false
my_file.write("next_content")
my_file.close()
# make sure code still works
rev_list = test_node.getAndUpdateFullRevisionList(node_test_suite)
rev_list = self.getAndUpdateFullRevisionList(test_node, node_test_suite)
self.assertEqual(2, len(rev_list))
self.assertEqual(2, len(node_test_suite.vcs_repository_list))
# and check local change was resetted
......@@ -408,7 +432,7 @@ develop = false
test_node = self.getTestNode()
node_test_suite = test_node.getNodeTestSuite('foo')
self.updateNodeTestSuiteData(node_test_suite, add_broken_repository=True)
rev_list = test_node.getAndUpdateFullRevisionList(node_test_suite)
rev_list = self.getAndUpdateFullRevisionList(test_node, node_test_suite)
self.assertEqual(None, rev_list)
def test_06_checkRevision(self):
......@@ -419,7 +443,7 @@ develop = false
test_node = self.getTestNode()
node_test_suite = test_node.getNodeTestSuite('foo')
self.updateNodeTestSuiteData(node_test_suite)
rev_list = test_node.getAndUpdateFullRevisionList(node_test_suite)
rev_list = self.getAndUpdateFullRevisionList(test_node, node_test_suite)
def getRepInfo(count=0, hash=0):
assert count or hash
info_list = []
......@@ -436,14 +460,13 @@ develop = false
self.assertEquals([commit_dict['rep0'][0][0],commit_dict['rep1'][0][0]],
getRepInfo(hash=1))
class TestResult(object):
pass
revision = NodeTestSuite.revision
test_result = TestResult()
# for test result to be one commit late for rep1 to force testnode to
# reset tree to older version
test_result.revision = 'rep0=2-%s,rep1=1-%s' % (commit_dict['rep0'][0][0],
commit_dict['rep1'][1][0])
test_result.revision_list = (('rep0', (2, commit_dict['rep0'][0][0])),
('rep1', (1, commit_dict['rep1'][1][0])))
test_node.checkRevision(test_result, node_test_suite)
expected_count_list = ['2', '1']
self.assertEquals(['2', '1'], getRepInfo(count=1))
self.assertEquals([commit_dict['rep0'][0][0],commit_dict['rep1'][1][0]],
getRepInfo(hash=1))
......@@ -519,24 +542,6 @@ develop = false
expected_parameter_list += option
checkRunTestSuiteParameters()
def test_08_getSupportedParamaterSet(self):
original_spawn = ProcessManager.spawn
try:
def get_help(self, *args, **kw):
return {'stdout': """My Program
--foo foo
--bar bar"""}
ProcessManager.spawn = get_help
process_manager = ProcessManager(log=None)
parameter_list = ['--foo', '--baz']
expected_suported_parameter_set = set(['--foo'])
supported_parameter_set = process_manager.getSupportedParameterSet(
"dummy_program_path", parameter_list)
self.assertEquals(expected_suported_parameter_set, supported_parameter_set)
finally:
ProcessManager.spawn = original_spawn
def test_10_prepareSlapOS(self, my_test_type='UnitTest'):
test_node = self.getTestNode()
test_node_slapos = SlapOSInstance(self.slapos_directory)
......@@ -643,13 +648,10 @@ develop = false
allow_restart=False, test_title=None, project_title=None):
global counter
# return no test to check if run method will run the next test suite
if counter == 3 and project_title != 'qux':
result = None
else:
if counter != 3 or project_title == 'qux':
test_result_path = os.path.join(test_result_path_root, test_title)
result = TestResultProxy(self._proxy, self._retry_time,
self._logger, test_result_path, node_title, revision)
return result
return TestResultProxy(self._proxy, self._retry_time,
logger, test_result_path, node_title, revision)
def patch_runTestSuite(self, *argv, **kw):
return {'status_code':0}
original_sleep = time.sleep
......@@ -716,7 +718,7 @@ develop = false
def _checkCorrectStatus(expected_status,*args):
result = process_manager.spawn(*args)
self.assertEqual(result['status_code'], expected_status)
process_manager = ProcessManager(log=self.log, max_timeout=1)
process_manager = ProcessManager(max_timeout=1)
_checkCorrectStatus(0, *['sleep','0'])
# We must make sure that if the command is too long that
# it will be automatically killed
......@@ -724,8 +726,7 @@ develop = false
def test_13_SlaposControlerResetSoftware(self):
test_node = self.getTestNode()
controler = SlapOSControler(self.working_directory,
test_node.config, self.log)
controler = SlapOSControler(self.working_directory, test_node.config)
os.mkdir(controler.software_root)
file_name = 'AC_Ra\xc3\xadzertic\xc3\xa1ma'
non_ascii_file = open(os.path.join(controler.software_root, file_name), 'w')
......@@ -787,30 +788,26 @@ develop = false
def patch_createTestResult(self, revision, test_name_list, node_title,
allow_restart=False, test_title=None, project_title=None):
test_result_path = os.path.join(test_result_path_root, test_title)
result = TestResultProxy(self._proxy, self._retry_time,
self._logger, test_result_path, node_title, revision)
return result
return TestResultProxy(self._proxy, self._retry_time,
logger, test_result_path, node_title, revision)
def patch_runTestSuite(self,*argv, **kw):
return {'status_code':0}
def checkTestSuite(test_node):
test_node.node_test_suite_dict
rand_part_set = set()
self.assertEquals(2, len(test_node.node_test_suite_dict))
self.assertIsNot(test_node.suite_log, None)
self.assertTrue(isinstance(test_node.suite_log, types.MethodType))
for ref, suite in test_node.node_test_suite_dict.items():
self.assertTrue('var/log/testnode/%s' % suite.reference in \
suite.suite_log_path,
"Incorrect suite log path : %r" % suite.suite_log_path)
self.assertTrue(suite.suite_log_path.endswith('suite.log'))
m = re.match('.*\-(.*)\/suite.log', suite.suite_log_path)
m = re.search('-(.*)/suite.log$', suite.suite_log_path)
rand_part = m.groups()[0]
self.assertEqual(len(rand_part), 10)
self.assertNotIn(rand_part, rand_part_set)
rand_part_set.add(rand_part)
suite_log = open(suite.suite_log_path, 'r')
self.assertEquals(1, len([x for x in suite_log.readlines() \
if x.find("Activated logfile")>=0]))
with open(suite.suite_log_path) as suite_log:
self.assertIn("Getting configuration from test suite",
suite_log.readline())
RunnerClass = test_type_registry[my_test_type]
original_sleep = time.sleep
......@@ -846,6 +843,7 @@ develop = false
original_createTestResult = TaskDistributor.createTestResult
TaskDistributor.createTestResult = patch_createTestResult
test_node = self.getTestNode()
del test_node.suiteLog
# Change UnitTestRunner class methods
original_prepareSlapOS = RunnerClass._prepareSlapOS
......@@ -943,7 +941,7 @@ develop = false
SlapOSControler.runSoftwareRelease = runSoftwareRelease
def callPrepareSlapOS():
runner._prepareSlapOS(self.working_directory, node_test_suite,
test_node.log, create_partition=0)
create_partition=0)
def callRaisingPrepareSlapos():
self.assertRaises(SubprocessError, callPrepareSlapOS)
......@@ -998,9 +996,8 @@ develop = false
def patch_createTestResult(self, revision, test_name_list, node_title,
allow_restart=False, test_title=None, project_title=None):
test_result_path = os.path.join(test_result_path_root, test_title)
result = TestResultProxy(self._proxy, self._retry_time,
self._logger, test_result_path, node_title, revision)
return result
return TestResultProxy(self._proxy, self._retry_time,
logger, test_result_path, node_title, revision)
global startTestSuiteDone
startTestSuiteDone = False
def patch_startTestSuite(self,node_title,computer_guid='unknown'):
......
......@@ -32,6 +32,7 @@ import threading
import signal
import sys
import time
from . import logger
MAX_TIMEOUT = 3600 * 4
......@@ -67,28 +68,28 @@ def format_command(*args, **kw):
cmdline.append(v)
return ' '.join(cmdline)
def subprocess_capture(p, log, log_prefix, get_output=True):
def readerthread(input, output, buffer):
def subprocess_capture(p, log_prefix, get_output=True):
log = logger.info
if log_prefix:
log_prefix += ': '
def readerthread(input, buffer):
while True:
data = input.readline()
if not data:
break
if get_output:
buffer.append(data)
if log_prefix:
data = "%s : " % log_prefix + data
data = data.rstrip('\n')
output(data)
log(log_prefix + data.rstrip('\n'))
if p.stdout:
stdout = []
stdout_thread = threading.Thread(target=readerthread,
args=(p.stdout, log, stdout))
args=(p.stdout, stdout))
stdout_thread.daemon = True
stdout_thread.start()
if p.stderr:
stderr = []
stderr_thread = threading.Thread(target=readerthread,
args=(p.stderr, log, stderr))
args=(p.stderr, stderr))
stderr_thread.daemon = True
stderr_thread.start()
p.wait()
......@@ -99,7 +100,7 @@ def subprocess_capture(p, log, log_prefix, get_output=True):
return (p.stdout and ''.join(stdout),
p.stderr and ''.join(stderr))
def killCommand(pid, log):
def killCommand(pid):
"""
To prevent processes from reacting to the KILL of other processes,
we STOP them all first, and we repeat until the list of children does not
......@@ -118,21 +119,20 @@ def killCommand(pid, log):
try:
child.suspend()
except psutil.Error, e:
log("killCommand/suspend: %s", e)
logger.debug("killCommand/suspend: %s", e)
time.sleep(1)
new_list = set(process.children(recursive=True)).difference(process_list)
for process in process_list:
try:
process.kill()
except psutil.Error, e:
log("killCommand/kill: %s", e)
logger.debug("killCommand/kill: %s", e)
class ProcessManager(object):
stdin = file(os.devnull)
def __init__(self, log, max_timeout=MAX_TIMEOUT):
self.log = log
def __init__(self, max_timeout=MAX_TIMEOUT):
self.process_pid_set = set()
signal.signal(signal.SIGTERM, self.sigterm_handler)
self.under_cancellation = False
......@@ -142,19 +142,17 @@ class ProcessManager(object):
self.timer_set = set()
def spawn(self, *args, **kw):
def timeoutExpired(p, log):
def timeoutExpired(p):
if p.poll() is None:
log('PROCESS TOO LONG OR DEAD, GOING TO BE TERMINATED')
killCommand(p.pid, log)
logger.warning('PROCESS TOO LONG OR DEAD, GOING TO BE TERMINATED')
killCommand(p.pid)
raise SubprocessError('Dead or too long process killed')
if self.under_cancellation:
raise CancellationError("Test Result was cancelled")
get_output = kw.pop('get_output', True)
log_prefix = kw.pop('log_prefix', '')
new_session = kw.pop('new_session', True)
log = kw.pop('log', None)
if log is None:
log = self.log
subprocess_kw = {}
cwd = kw.pop('cwd', None)
if cwd:
......@@ -164,17 +162,16 @@ class ProcessManager(object):
raise_error_if_fail = kw.pop('raise_error_if_fail', True)
env = kw and dict(os.environ, **kw) or None
command = format_command(*args, **kw)
log('subprocess_kw : %r' % (subprocess_kw,))
log('$ ' + command)
logger.info('subprocess_kw : %r', subprocess_kw)
logger.info('$ %s', command)
sys.stdout.flush()
p = subprocess.Popen(args, stdin=self.stdin, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, env=env, **subprocess_kw)
self.process_pid_set.add(p.pid)
timer = threading.Timer(self.max_timeout, timeoutExpired, args=(p, log))
timer = threading.Timer(self.max_timeout, timeoutExpired, args=(p,))
self.timer_set.add(timer)
timer.start()
stdout, stderr = subprocess_capture(p, log, log_prefix,
get_output=get_output)
stdout, stderr = subprocess_capture(p, log_prefix, get_output=get_output)
timer.cancel()
self.timer_set.discard(timer)
result = dict(status_code=p.returncode, command=command,
......@@ -211,32 +208,39 @@ class ProcessManager(object):
continue
except (psutil.AccessDenied, psutil.NoSuchProcess):
continue
self.log('ProcesssManager, killall on %s having pid %s',
logger.debug('ProcesssManager, killall on %s having pid %s',
name, process.pid)
to_kill_list.append(process.pid)
for pid in to_kill_list:
killCommand(pid, self.log)
killCommand(pid)
def killPreviousRun(self, cancellation=False):
self.log('ProcessManager killPreviousRun, going to kill %r',
logger.debug('ProcessManager killPreviousRun, going to kill %r',
self.process_pid_set)
if cancellation:
self.under_cancellation = True
for timer in self.timer_set:
timer.cancel()
for pgpid in self.process_pid_set:
killCommand(pgpid, self.log)
killCommand(pgpid)
try:
pid_file = self.supervisord_pid_file
except AttributeError:
pass
else:
del self.supervisord_pid_file
try:
if os.path.exists(self.supervisord_pid_file):
with open(self.supervisord_pid_file) as f:
supervisor_pid = int(f.read().strip())
self.log('ProcessManager killPreviousRun, going to kill supervisor with pid %r',
supervisor_pid)
os.kill(supervisor_pid, signal.SIGTERM)
if os.path.exists(pid_file):
with open(pid_file) as f:
pid = int(f.read().strip())
logger.debug('ProcessManager killPreviousRun,'
' going to kill supervisor with pid %r', pid)
os.kill(pid, signal.SIGTERM)
except Exception:
self.log('ProcessManager killPreviousRun, exception when killing supervisor')
logger.exception(
'ProcessManager killPreviousRun, exception when killing supervisor')
self.process_pid_set.clear()
def sigterm_handler(self, signal, frame):
self.log('SIGTERM_HANDLER')
logger.debug('SIGTERM_HANDLER')
sys.exit(1)
......@@ -47,6 +47,8 @@ from erp5.util import taskdistribution
import signal
import slapos.slap
from . import logger
# max time to instance changing state: 2 hour
MAX_INSTANCE_TIME = 60*60*2
# max time to register instance to slapOSMaster: 5 minutes
......@@ -59,12 +61,9 @@ MAX_PREPARE_TEST_SUITE = 3600*10*1.0 # 10 hours
class ScalabilityTestRunner():
def __init__(self, testnode):
self.testnode = testnode
self.log = self.testnode.log
self.slapos_controler = SlapOSControler.SlapOSControler(
self.testnode.working_directory,
self.testnode.config,
self.log)
self.testnode.config)
# Create the slapos account configuration file and dir
key = self.testnode.test_suite_portal.getSlaposAccountKey()
certificate = self.testnode.test_suite_portal.getSlaposAccountCertificate()
......@@ -80,8 +79,8 @@ class ScalabilityTestRunner():
# Get Slapos Master url used for api rest (using hateoas)
self.slapos_api_rest_url = self.testnode.test_suite_portal.getSlaposHateoasUrl()
self.log("SlapOS Master url is: %s" %self.slapos_url)
self.log("SlapOS Master hateoas url is: %s" %self.slapos_api_rest_url)
logger.info("SlapOS Master url is: %s", self.slapos_url)
logger.info("SlapOS Master hateoas url is: %s", self.slapos_api_rest_url)
self.key_path, self.cert_path, config_path = self.slapos_controler.createSlaposConfigurationFileAccount(
key, certificate, self.slapos_url, self.testnode.config)
......@@ -102,7 +101,7 @@ class ScalabilityTestRunner():
"""
A proxy to supply : Install a software on a specific node
"""
self.log("testnode, supply : %s %s", software_path, computer_guid)
logger.info("testnode, supply : %s %s", software_path, computer_guid)
if self.authorize_supply :
self.remaining_software_installation_dict[computer_guid] = software_path
self.slapos_communicator.supply(software_path, computer_guid)
......@@ -145,7 +144,7 @@ class ScalabilityTestRunner():
Create scalability instance
"""
if self.authorize_request:
self.log("testnode, request : %s", instance_title)
logger.info("testnode, request : %s", instance_title)
config = self._generateInstanceXML(software_configuration,
test_result, test_suite)
request_kw = {"partition_parameter_kw": {"_" : json.dumps(config)} }
......@@ -168,14 +167,15 @@ ces or already launched.")
# Dummy slapos answering
def _getSignal(self, signal, frame):
self.log("Dummy SlapOS Master answer received.")
logger.debug("Dummy SlapOS Master answer received.")
self.last_slapos_answer.append(True)
def _prepareDummySlapOSAnswer(self):
self.log("Dummy slapOS answer enabled, send signal to %s (kill -10 %s) to simu\
late a SlapOS (positive) answer." %(str(os.getpid()),str(os.getpid()),))
pid = os.getpid()
logger.info("Dummy slapOS answer enabled, send signal to %s (kill -USR1 %s)"
" to simulate a SlapOS (positive) answer.", pid, pid)
signal.signal(signal.SIGUSR1, self._getSignal)
def _comeBackFromDummySlapOS(self):
self.log("Dummy slapOS answer disabled, please don't send more signals.")
logger.info("Dummy slapOS answer disabled, please don't send more signals.")
# use SIG_USR (kill)
signal.signal(signal.SIGUSR1, signal.SIG_DFL)
def simulateSlapOSAnswer(self):
......@@ -190,7 +190,8 @@ late a SlapOS (positive) answer." %(str(os.getpid()),str(os.getpid()),))
Return true if the specified software on the specified node is installed.
This method should communicates with SlapOS Master.
"""
self.log("Current software state: " + str(self.slapos_communicator._getSoftwareState()))
logger.info("Current software state: %s",
self.slapos_communicator._getSoftwareState())
return self.slapos_communicator._getSoftwareState() == SlapOSMasterCommunicator.SOFTWARE_STATE_INSTALLED
def remainSoftwareToInstall(self):
......@@ -206,7 +207,7 @@ late a SlapOS (positive) answer." %(str(os.getpid()),str(os.getpid()),))
def _updateInstanceXML(self, software_configuration, instance_title,
test_result, test_suite):
self.log("testnode, updateInstanceXML : %s", instance_title)
logger.info("testnode, updateInstanceXML : %s", instance_title)
config = self._generateInstanceXML(software_configuration,
test_result, test_suite)
request_kw = {"partition_parameter_kw": {"_" : json.dumps(config)} }
......@@ -217,15 +218,15 @@ late a SlapOS (positive) answer." %(str(os.getpid()),str(os.getpid()),))
"""
Wait for 'max_time' the instance creation
"""
self.log("Waiting for instance creation...")
logger.debug("Waiting for instance creation...")
start_time = time.time()
while (not self.slapos_communicator.isInstanceRequested(instance_title) \
and (max_time > (time.time()-start_time)) ):
self.log("Instance not ready yet. Sleeping 5 sec.")
logger.debug("Instance not ready yet. Sleeping 5 sec.")
time.sleep(5)
if (time.time()-start_time) > max_time:
raise ValueError("Instance '%s' not found after %s seconds" %(instance_title, max_time))
self.log("Instance found on slapOSMaster")
logger.debug("Instance found on slapOSMaster")
def _initializeSlapOSConnection(self):
"""
......@@ -244,7 +245,8 @@ late a SlapOS (positive) answer." %(str(os.getpid()),str(os.getpid()),))
slapgrid_rest_uri=self.slapos_api_rest_url)
if getattr(slap, '_hateoas_navigator', None) is None:
retry += 1
self.log("Fail to load _hateoas_navigator waiting a bit and retry.")
logger.info(
"Fail to load _hateoas_navigator waiting a bit and retry.")
time.sleep(30)
else:
break
......@@ -268,12 +270,13 @@ late a SlapOS (positive) answer." %(str(os.getpid()),str(os.getpid()),))
not os.path.exists(self.obfuscated_link_path) ) :
try :
os.symlink(path_to_suite, self.obfuscated_link_path)
self.log("testnode, Symbolic link (%s->%s) created."
%(self.obfuscated_link_path, path_to_suite))
except :
self.log("testnode, Unable to create symbolic link to the testsuite.")
raise ValueError("testnode, Unable to create symbolic link to the testsuite.")
self.log("Sym link : %s %s" %(path_to_suite, self.obfuscated_link_path))
logger.info("testnode, Symbolic link (%s->%s) created.",
self.obfuscated_link_path, path_to_suite)
except Exception:
msg = "testnode, Unable to create symbolic link to the testsuite."
logger.exception(msg)
raise ValueError(msg)
logger.info("Sym link : %s %s", path_to_suite, self.obfuscated_link_path)
# Construct the ipv6 obfuscated url of the software profile reachable from outside
self.reachable_address = os.path.join(
......@@ -301,7 +304,7 @@ late a SlapOS (positive) answer." %(str(os.getpid()),str(os.getpid()),))
"""
Install testsuite softwares
"""
self.log('Preparing SlapOS for Test Suite...')
logger.debug('Preparing SlapOS for Test Suite...')
max_time = MAX_PREPARE_TEST_SUITE
interval_time = 60
start_time = time.time()
......@@ -320,9 +323,9 @@ late a SlapOS (positive) answer." %(str(os.getpid()),str(os.getpid()),))
self.error_message = test_configuration['error_message']
self.randomized_path = test_configuration['randomized_path']
if not self.launchable:
self.log("Test suite %s is not actually launchable with \
the current cluster configuration." %(node_test_suite.test_suite_title,))
self.log("ERP5 Master indicates : %s" %(self.error_message,))
logger.info("Test suite %s is not actually launchable"
" with the current cluster configuration.", node_test_suite.test_suite_title)
logger.info("ERP5 Master indicates : %s", self.error_message)
return {'status_code' : 1}
configuration_list = test_configuration['configuration_list']
......@@ -331,12 +334,12 @@ late a SlapOS (positive) answer." %(str(os.getpid()),str(os.getpid()),))
self.instance_title = self._generateInstanceTitle(node_test_suite.test_suite_title)
self.createSoftwareReachableProfilePath(node_test_suite)
self.log("Software reachable profile path is : %s " %(self.reachable_profile))
logger.info("Software reachable profile path is: %s",
self.reachable_profile)
# Initialize SlapOS Master Communicator
self.slapos_communicator = SlapOSMasterCommunicator.SlapOSTester(
self.instance_title,
self.log,
slap,
order,
supply,
......@@ -353,9 +356,9 @@ late a SlapOS (positive) answer." %(str(os.getpid()),str(os.getpid()),))
# Waiting until all softwares are installed
while (self.remainSoftwareToInstall()
and (max_time > (time.time()-start_time))):
self.log("Master testnode is waiting\
for the end of all software installation (for %ss) PID=%s.",
str(int(time.time()-start_time)), str(os.getpid()))
logger.info("Master testnode is waiting for the end of"
" all software installation (for %ss) PID=%s.",
int(time.time()-start_time), os.getpid())
time.sleep(interval_time)
# TODO : remove the line below wich simulate an answer from slapos master
self._comeBackFromDummySlapOS()
......@@ -363,7 +366,7 @@ late a SlapOS (positive) answer." %(str(os.getpid()),str(os.getpid()),))
# All softwares are not installed, however maxtime is elapsed, that's a failure.
return {'status_code' : 1}
self.authorize_request = True
self.log("Softwares installed.")
logger.debug("Softwares installed.")
# Launch instance
try:
self._createInstance(self.reachable_profile,
......@@ -371,10 +374,11 @@ late a SlapOS (positive) answer." %(str(os.getpid()),str(os.getpid()),))
self.instance_title,
node_test_suite.test_result,
node_test_suite.test_suite)
self.log("Scalability instance requested.")
except:
self.log("Unable to launch instance")
raise ValueError("Unable to launch instance")
logger.debug("Scalability instance requested.")
except Exception:
msg = "Unable to launch instance"
logger.exception(msg)
raise ValueError(msg)
self._waitInstanceCreation(self.instance_title)
return {'status_code' : 0}
......@@ -383,7 +387,7 @@ late a SlapOS (positive) answer." %(str(os.getpid()),str(os.getpid()),))
def runTestSuite(self, node_test_suite, portal_url):
if not self.launchable:
self.log("Current test_suite is not actually launchable.")
logger.info("Current test_suite is not actually launchable.")
return {'status_code' : 1} # Unable to continue due to not realizable configuration
configuration_list = node_test_suite.configuration_list
test_list = range(0, len(configuration_list))
......@@ -392,7 +396,7 @@ late a SlapOS (positive) answer." %(str(os.getpid()),str(os.getpid()),))
self.testnode.config['test_node_title'],
True, node_test_suite.test_suite_title,
node_test_suite.project_title)
self.log("Test Result created.")
logger.debug("Test Result created.")
count = 0
error_message = None
......@@ -410,11 +414,11 @@ late a SlapOS (positive) answer." %(str(os.getpid()),str(os.getpid()),))
self.slapos_communicator.requestInstanceStart()
self.slapos_communicator.waitInstanceStarted(self.instance_title)
self.log("[DEBUG] INSTANCE CORRECTLY STARTED")
logger.debug("INSTANCE CORRECTLY STARTED")
# ROQUE XXX : for debug
if True:
self.log("RETURN FOR DEBUG")
logger.debug("RETURN FOR DEBUG")
return {'status_code' : 0}
# Start only the current test
......@@ -426,7 +430,7 @@ late a SlapOS (positive) answer." %(str(os.getpid()),str(os.getpid()),))
error_message = "Test case already tested."
break
self.log("Test for count : %d is in a running state." %count)
logger.info("Test for count : %d is in a running state.", count)
# Wait for test case ending
test_case_start_time = time.time()
......@@ -463,14 +467,14 @@ late a SlapOS (positive) answer." %(str(os.getpid()),str(os.getpid()),))
test_result_line_proxy.stop(error_count=1, failure_count=1,
stdout=error_message, stderr=error_message)
test_result_proxy.reportFailure(stdout=error_message)
self.log("Test Failed.")
logger.debug("Test Failed.")
return {'status_code' : 1, 'error_message': error_message}
# Test is finished.
self.log("Test finished.")
logger.debug("Test finished.")
return {'status_code' : 0}
def _cleanUpOldInstance(self):
self.log("_cleanUpOldInstance")
logger.debug("_cleanUpOldInstance")
# Get title and link list of all instances
instance_dict = self.slapos_communicator.getHostingSubscriptionDict()
......@@ -502,7 +506,8 @@ late a SlapOS (positive) answer." %(str(os.getpid()),str(os.getpid()),))
computer_guid=instance_information_dict['computer_guid'],
state='destroyed'
)
self.log("Instance '%s' deleted." %instance_information_dict['title'])
logger.debug("Instance '%s' deleted.",
instance_information_dict['title'])
def _cleanUpNodesInformation(self):
self.involved_nodes_computer_guid = []
......
......@@ -30,9 +30,9 @@ import slapos.slap
import subprocess
import time
import xml_marshaller
import sys
import argparse
from slapos import client
from . import logger
from .Utils import createFolder
MAX_PARTITIONS = 10
......@@ -40,12 +40,11 @@ MAX_SR_RETRIES = 3
class SlapOSControler(object):
def __init__(self, working_directory, config, log):
def __init__(self, working_directory, config):
self.config = config
self.software_root = os.path.join(working_directory, 'soft')
self.instance_root = os.path.join(working_directory, 'inst')
self.slapos_config = os.path.join(working_directory, 'slapos.cfg')
self.log = log
self.proxy_database = os.path.join(working_directory, 'proxy.db')
self.instance_config = {}
......@@ -80,7 +79,7 @@ class SlapOSControler(object):
Ex :
my_controler.supply('kvm.cfg', 'COMP-726')
"""
self.log('SlapOSControler : supply')
logger.debug('SlapOSControler : supply')
parser = argparse.ArgumentParser()
parser.add_argument("configuration_file")
parser.add_argument("software_url")
......@@ -92,9 +91,9 @@ class SlapOSControler(object):
try:
local = client.init(config)
local['supply'](software_url, computer_guid=computer_id, state=state)
self.log('SlapOSControler : supply %s %s %s', software_url, computer_id, state)
logger.debug('SlapOSControler: supply %s %s %s', software_url, computer_id, state)
except Exception:
self.log("SlapOSControler.supply", exc_info=sys.exc_info())
logger.exception("SlapOSControler.supply")
raise ValueError("Unable to supply (or remove)")
else:
raise ValueError("Configuration file not found.")
......@@ -113,7 +112,7 @@ class SlapOSControler(object):
'kvm.cfg', 'cluster', { "_" : "{'toto' : 'titi'}" } )
"""
self.log('SlapOSControler : request-->SlapOSMaster')
logger.debug('SlapOSControler : request-->SlapOSMaster')
current_intance_config = {'software_type':software_type,
'software_configuration':software_configuration,
'computer_guid':computer_guid,
......@@ -145,10 +144,10 @@ class SlapOSControler(object):
if state == 'destroyed':
del self.instance_config[reference]
elif state == 'started':
self.log('Instance started with configuration: %s',
logger.debug('Instance started with configuration: %s',
software_configuration)
except Exception:
self.log("SlapOSControler.request", exc_info=sys.exc_info())
logger.exception("SlapOSControler.request")
raise ValueError("Unable to do this request")
else:
raise ValueError("Configuration file not found.")
......@@ -163,21 +162,21 @@ class SlapOSControler(object):
)
def destroyInstance(self, reference):
self.log('SlapOSControler : delete instance')
logger.debug('SlapOSControler : delete instance')
try:
self._requestSpecificState(reference, 'destroyed')
except Exception:
raise ValueError("Can't delete instance %r (instance not created?)" % reference)
def stopInstance(self, reference):
self.log('SlapOSControler : stop instance')
logger.debug('SlapOSControler : stop instance')
try:
self._requestSpecificState(reference, 'stopped')
except Exception:
raise ValueError("Can't stop instance %r (instance not created?)" % reference)
def startInstance(self, reference):
self.log('SlapOSControler : start instance')
logger.debug('SlapOSControler : start instance')
try:
self._requestSpecificState(reference, 'started')
except Exception:
......@@ -188,7 +187,7 @@ class SlapOSControler(object):
Update the XML configuration of an instance
# Request same instance with different parameters.
"""
self.log('SlapOSControler : updateInstanceXML will request same'
logger.debug('SlapOSControler : updateInstanceXML will request same'
' instance with new XML configuration...')
try:
......@@ -203,7 +202,7 @@ class SlapOSControler(object):
raise ValueError("Can't update instance '%s' (may not exist?)" %reference)
def _resetSoftware(self):
self.log('SlapOSControler : GOING TO RESET ALL SOFTWARE : %r',
logger.info('SlapOSControler: GOING TO RESET ALL SOFTWARE : %r',
self.software_root)
createFolder(self.software_root, True)
......@@ -211,7 +210,7 @@ class SlapOSControler(object):
reset_software=False, software_path_list=None):
self.process_manager = process_manager
self.software_path_list = software_path_list
self.log('SlapOSControler, initialize, reset_software: %r', reset_software)
logger.debug('SlapOSControler, initialize, reset_software: %r', reset_software)
config = self.config
slapos_config_dict = config.copy()
slapos_config_dict.update(software_root=self.software_root,
......@@ -248,8 +247,7 @@ class SlapOSControler(object):
computer_guid=config['computer_id'])
computer = slap.registerComputer(config['computer_id'])
except Exception:
self.log("SlapOSControler.initializeSlapOSControler",
exc_info=sys.exc_info())
logger.exception("SlapOSControler.initializeSlapOSControler")
raise ValueError("Unable to registerSupply")
# Reset all previously generated software if needed
if reset_software:
......@@ -289,7 +287,7 @@ class SlapOSControler(object):
return self.process_manager.spawn(*args, **kw)
def runSoftwareRelease(self, config, environment, **kw):
self.log("SlapOSControler.runSoftwareRelease")
logger.debug("SlapOSControler.runSoftwareRelease")
cpu_count = str(os.sysconf("SC_NPROCESSORS_ONLN"))
os.environ['MAKEFLAGS'] = '-j' + cpu_count
os.environ['NPY_NUM_BUILD_JOBS'] = cpu_count
......@@ -309,7 +307,7 @@ class SlapOSControler(object):
def runComputerPartition(self, config, environment,
stdout=None, stderr=None, cluster_configuration=None, **kw):
self.log("SlapOSControler.runComputerPartition with cluster_config: %r",
logger.debug("SlapOSControler.runComputerPartition with cluster_config: %r",
cluster_configuration)
for path in self.software_path_list:
try:
......@@ -318,7 +316,7 @@ class SlapOSControler(object):
self.software_path_list.index(path),
partition_parameter_kw=cluster_configuration)
except Exception:
self.log("SlapOSControler.runComputerPartition", exc_info=sys.exc_info())
logger.exception("SlapOSControler.runComputerPartition")
raise ValueError("Unable to registerOpenOrder")
# try to run for all partitions as one partition may in theory request another one
......@@ -329,7 +327,7 @@ class SlapOSControler(object):
'--pidfile', os.path.join(self.instance_root, 'slapos-node.pid'),
'--cfg', self.slapos_config, raise_error_if_fail=False,
log_prefix='slapgrid_cp', get_output=False)
self.log('slapgrid_cp status_dict : %r', status_dict)
logger.debug('slapgrid_cp status_dict : %r', status_dict)
if not status_dict['status_code']:
break
else:
......
......@@ -10,6 +10,7 @@ import slapos.slap
from slapos.slap import SoftwareProductCollection
from requests.exceptions import HTTPError
from ..taskdistribution import SAFE_RPC_EXCEPTION_LIST
from . import logger
# max time to instance changing state: 2 hour
MAX_INSTANCE_TIME = 60*60*2
......@@ -61,8 +62,7 @@ def retryOnNetworkFailure(func,
class SlapOSMasterCommunicator(object):
latest_state = None
def __init__(self, slap, slap_supply, slap_order, url, logger):
self._logger = logger
def __init__(self, slap, slap_supply, slap_order, url):
self.slap = slap
self.slap_order = slap_order
self.slap_supply = slap_supply
......@@ -72,20 +72,20 @@ class SlapOSMasterCommunicator(object):
if url is not None and \
url.startswith(SOFTWARE_PRODUCT_NAMESPACE):
product = SoftwareProductCollection(self._logger, self.slap)
product = SoftwareProductCollection(logger, self.slap)
try:
url = product.__getattr__(url[len(SOFTWARE_PRODUCT_NAMESPACE):])
except AttributeError as e:
self._logger.warning('Error on get software release : %s ' % e.message)
logger.warning('Error on get software release: %s ', e.message)
self.url = url
@retryOnNetworkFailure
def _supply(self):
if self.computer_guid is None:
self._logger('Nothing to supply for %s.' % (self.name))
logger.info('Nothing to supply for %s.', self.name)
return None
self._logger('Supply %s@%s', self.url, self.computer_guid)
logger.info('Supply %s@%s', self.url, self.computer_guid)
return self.slap_supply.supply(self.url, self.computer_guid)
@retryOnNetworkFailure
......@@ -97,7 +97,7 @@ class SlapOSMasterCommunicator(object):
self.request_kw = json.loads(request_kw)
else:
self.request_kw = request_kw
self._logger('Request %s@%s: %s', self.url, self.name, state)
logger.info('Request %s@%s: %s', self.url, self.name, state)
self.latest_state = state
return self.slap_order.request(
software_release=self.url,
......@@ -252,7 +252,7 @@ class SlapOSMasterCommunicator(object):
@retryOnNetworkFailure
def _getInstanceState(self):
latest_state = self.latest_state
self._logger('latest_state = %r', latest_state)
logger.info('latest_state = %r', latest_state)
if latest_state is None:
return INSTANCE_STATE_UNKNOWN
......@@ -297,8 +297,8 @@ class SlapOSMasterCommunicator(object):
try:
monitor_information_dict = self.getRSSEntryFromMonitoring(monitor_v6_url)
except Exception:
self._logger('Unable to download promises for: %s' % (instance["title"]))
self._logger(traceback.format_exc())
logger.exception('Unable to download promises for: %s',
instance["title"])
monitor_information_dict = {"message": "Unable to download"}
message_list.append({
......@@ -311,11 +311,10 @@ class SlapOSMasterCommunicator(object):
})
except slapos.slap.ServerError:
self._logger('Got an error requesting partition for '
'its state')
logger.error('Got an error requesting partition for its state')
return INSTANCE_STATE_UNKNOWN
except Exception:
self._logger("ERROR getting instance state")
logger.error("ERROR getting instance state")
return INSTANCE_STATE_UNKNOWN
started = 0
......@@ -348,24 +347,24 @@ class SlapOSMasterCommunicator(object):
"""
Wait for 'max_time' an instance specific state
"""
self._logger("Waiting for instance state: %s" %state)
logger.info("Waiting for instance state: %s", state)
start_time = time.time()
while (not self._getInstanceState() == state
and (max_time > (time.time()-start_time))):
self._logger("Instance(s) not in %s state yet." % state)
self._logger("Current state: %s" % self._getInstanceState())
logger.info("Instance(s) not in %s state yet.", state)
logger.info("Current state: %s", self._getInstanceState())
time.sleep(15)
if (time.time()-start_time) > max_time:
error_message = "Instance '%s' not '%s' after %s seconds" %(instance_title, state, str(time.time()-start_time))
return {'error_message' : error_message}
self._logger("Instance correctly '%s' after %s seconds." %(state, str(time.time()-start_time)))
logger.info("Instance correctly '%s' after %s seconds.",
state, time.time() - start_time)
return {'error_message' : None}
class SlapOSTester(SlapOSMasterCommunicator):
def __init__(self,
name,
logger,
slap,
slap_order,
slap_supply,
......@@ -374,7 +373,7 @@ class SlapOSTester(SlapOSMasterCommunicator):
request_kw=None
):
super(SlapOSTester, self).__init__(
slap, slap_supply, slap_order, url, logger)
slap, slap_supply, slap_order, url)
self.name = name
self.computer_guid = computer_guid
......@@ -413,9 +412,10 @@ class SlapOSTester(SlapOSMasterCommunicator):
def waitInstanceStarted(self, instance_title):
error_message = self._waitInstance(instance_title, INSTANCE_STATE_STARTED)["error_message"]
if error_message is not None:
self._logger(error_message)
self._logger("Do you use instance state propagation in your project?")
self._logger("Instance '%s' will be stopped and test aborted." %instance_title)
logger.error(error_message)
logger.error("Do you use instance state propagation in your project?")
logger.error("Instance '%s' will be stopped and test aborted.",
instance_title)
self.requestInstanceStop()
time.sleep(60)
raise ValueError(error_message)
......@@ -423,15 +423,15 @@ class SlapOSTester(SlapOSMasterCommunicator):
def waitInstanceStopped(self, instance_title):
error_message = self._waitInstance(instance_title, INSTANCE_STATE_STOPPED)["error_message"]
if error_message is not None:
self._logger(error_message)
self._logger("Do you use instance state propagation in your project?")
logger.error(error_message)
logger.error("Do you use instance state propagation in your project?")
raise ValueError(error_message)
def waitInstanceDestroyed(self, instance_title):
error_message = self._waitInstance(instance_title, INSTANCE_STATE_DESTROYED)["error_message"]
if error_message is not None:
self._logger(error_message)
self._logger("Do you use instance state propagation in your project?")
logger.error(error_message)
logger.error("Do you use instance state propagation in your project?")
raise ValueError(error_message)
class SoftwareReleaseTester(SlapOSTester):
......@@ -439,7 +439,6 @@ class SoftwareReleaseTester(SlapOSTester):
def __init__(self,
name,
logger,
slap,
slap_order,
slap_supply,
......@@ -450,7 +449,7 @@ class SoftwareReleaseTester(SlapOSTester):
instance_timeout=3600,
):
super(SoftwareReleaseTester, self).__init__(
name, logger, slap, slap_order, slap_supply, url, computer_guid, request_kw)
name, slap, slap_order, slap_supply, url, computer_guid, request_kw)
self.state = TESTER_STATE_INITIAL
self.transition_dict = {
......@@ -536,7 +535,7 @@ class SoftwareReleaseTester(SlapOSTester):
"""
Interrupt a running test sequence, putting it in idle state.
"""
self._logger('Invoking TearDown for %s@%s' % (self.url, self.name))
logger.info('Invoking TearDown for %s@%s', self.url, self.name)
if self.request_kw is not None:
self._request(INSTANCE_STATE_DESTROYED)
if self.computer_guid is not None:
......@@ -548,7 +547,7 @@ class SoftwareReleaseTester(SlapOSTester):
Check for missed deadlines (-> test failure), conditions for moving to
next state, and actually moving to next state (executing its payload).
"""
self._logger('[DEBUG] TIC')
logger.debug('TIC')
deadline = self.deadline
if deadline < now and deadline is not None:
......@@ -562,7 +561,7 @@ class SoftwareReleaseTester(SlapOSTester):
instance_state is None or
instance_state == self._getInstanceState()):
self._logger('[DEBUG] Going to state %s (%r)', next_state, instance_state)
logger.debug('Going to state %s (%r)', next_state, instance_state)
if next_state is None:
return None
......
......@@ -27,6 +27,7 @@
import os
import glob
import json
from . import logger
from .ProcessManager import SubprocessError
from .SlapOSControler import SlapOSControler
from .Utils import createFolder
......@@ -49,21 +50,20 @@ class UnitTestRunner(object):
"""
return SlapOSControler(
working_directory,
self.testnode.config,
self.testnode.log)
self.testnode.config)
def _prepareSlapOS(self, working_directory, slapos_instance, log,
def _prepareSlapOS(self, working_directory, slapos_instance,
create_partition=1, software_path_list=None, **kw):
"""
Launch slapos to build software and partitions
"""
slapproxy_log = os.path.join(self.testnode.config['log_directory'],
'slapproxy.log')
log('Configured slapproxy log to %r', slapproxy_log)
logger.debug('Configured slapproxy log to %r', slapproxy_log)
reset_software = slapos_instance.retry_software_count > 10
if reset_software:
slapos_instance.retry_software_count = 0
log('testnode, retry_software_count : %r',
logger.info('testnode, retry_software_count: %r',
slapos_instance.retry_software_count)
# XXX Create a new controler because working_directory can be
......@@ -80,12 +80,12 @@ class UnitTestRunner(object):
method_list.append("runComputerPartition")
for method_name in method_list:
slapos_method = getattr(slapos_controler, method_name)
log("Before status_dict = slapos_method(...)")
logger.debug("Before status_dict = slapos_method(...)")
status_dict = slapos_method(self.testnode.config,
environment=self.testnode.config['environment'],
**kw)
log(status_dict)
log("After status_dict = slapos_method(...)")
logger.info(status_dict)
logger.debug("After status_dict = slapos_method(...)")
if status_dict['status_code'] != 0:
slapos_instance.retry = True
slapos_instance.retry_software_count += 1
......@@ -103,7 +103,7 @@ class UnitTestRunner(object):
# instance. This is a hack which must be removed.
config = self.testnode.config
return self._prepareSlapOS(test_node_slapos.working_directory,
test_node_slapos, self.testnode.log, create_partition=0,
test_node_slapos, create_partition=0,
software_path_list=config.get("software_list"),
cluster_configuration={
'report-url': config.get("report-url", ""),
......@@ -116,7 +116,7 @@ class UnitTestRunner(object):
Build softwares needed by testsuites
"""
return self._prepareSlapOS(node_test_suite.working_directory,
node_test_suite, self.testnode.log,
node_test_suite,
software_path_list=[node_test_suite.custom_profile_path],
cluster_configuration={'_': json.dumps(node_test_suite.cluster_configuration)})
......@@ -124,7 +124,7 @@ class UnitTestRunner(object):
return self._getSlapOSControler(
node_test_suite.working_directory).instance_root
def runTestSuite(self, node_test_suite, portal_url, log=None):
def runTestSuite(self, node_test_suite, portal_url):
config = self.testnode.config
run_test_suite_path_list = glob.glob(
self.getInstanceRoot(node_test_suite) + "/*/bin/runTestSuite")
......
......@@ -28,7 +28,7 @@ import errno
import os
import re
import shutil
import sys
from . import logger
from .ProcessManager import SubprocessError
SVN_UP_REV = re.compile(r'^(?:At|Updated to) revision (\d+).$')
......@@ -42,10 +42,9 @@ class Updater(object):
_git_cache = {}
def __init__(self, repository_path, log, revision=None, git_binary='git',
def __init__(self, repository_path, revision=None, git_binary='git',
branch=None, realtime_output=True, process_manager=None, url=None,
working_directory=None):
self.log = log
self.revision = revision
self._path_list = []
self.branch = branch
......@@ -111,16 +110,16 @@ class Updater(object):
git_repository_path = os.path.join(self.getRepositoryPath(), '.git')
name = os.path.basename(os.path.normpath(self.getRepositoryPath()))
git_repository_link_path = os.path.join(self.getRepositoryPath(), '%s.git' %name)
self.log("checking link %s -> %s..",
logger.debug("checking link %s -> %s..",
git_repository_link_path, git_repository_path)
if ( not os.path.lexists(git_repository_link_path) and \
not os.path.exists(git_repository_link_path) ):
try:
os.symlink(git_repository_path, git_repository_link_path)
self.log("link: %s -> %s created",
logger.debug("link: %s -> %s created",
git_repository_link_path, git_repository_path)
except OSError:
self.log("Cannot create link from %s -> %s",
logger.error("Cannot create link from %s -> %s",
git_repository_link_path, git_repository_path)
def _git_find_rev(self, ref):
......@@ -148,7 +147,7 @@ class Updater(object):
raise NotImplementedError
def deleteRepository(self):
self.log("Wrong repository or wrong url, deleting repos %s",
logger.info("Wrong repository or wrong url, deleting repos %s",
self.repository_path)
shutil.rmtree(self.repository_path)
......@@ -162,7 +161,7 @@ class Updater(object):
if remote_url == self.url:
correct_url = True
except SubprocessError:
self.log("SubprocessError", exc_info=sys.exc_info())
logger.exception("")
if not(correct_url):
self.deleteRepository()
if not os.path.exists(self.repository_path):
......
......@@ -30,9 +30,12 @@ import logging
import logging.handlers
import os
from .testnode import TestNode
log_formatter = logging.Formatter('%(asctime)s %(levelname)-8s %(message)s')
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
def main(*args):
from .testnode import TestNode
parser = argparse.ArgumentParser()
parser.add_argument("configuration_file", nargs=1, type=argparse.FileType(),
help="Configuration file.")
......@@ -43,29 +46,24 @@ def main(*args):
parsed_argument = parser.parse_args(list(args))
else:
parsed_argument = parser.parse_args()
logger_format = '%(asctime)s %(name)-13s: %(levelname)-8s %(message)s'
formatter = logging.Formatter(logger_format)
logging.basicConfig(level=logging.INFO,
format=logger_format)
logger = logging.getLogger('erp5testnode')
CONFIG = {
'logger': logger.info,
'partition_reference': 'test0',
}
if parsed_argument.console or parsed_argument.logfile:
root = logging.getLogger()
def addHandler(handler):
handler.setFormatter(log_formatter)
root.addHandler(handler)
if parsed_argument.console:
logger.addHandler(logging.StreamHandler())
logger.info('Activated console output.')
addHandler(logging.StreamHandler())
if parsed_argument.logfile:
file_handler = logging.handlers.RotatingFileHandler(
addHandler(logging.handlers.RotatingFileHandler(
filename=parsed_argument.logfile,
maxBytes=20000000, backupCount=4)
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
logger.info('Activated logfile %r output', parsed_argument.logfile)
CONFIG['log_file'] = parsed_argument.logfile
maxBytes=20000000, backupCount=4))
else:
logger.addHandler(logging.NullHandler())
logger.disable(logging.CRITICAL)
CONFIG = {
'partition_reference': 'test0',
}
config = ConfigParser.SafeConfigParser()
# do not change case of option keys
config.optionxform = str
......@@ -106,5 +104,4 @@ def main(*args):
CONFIG['software_list'] = filter(None,
config.get("software_list", "path_list").split(","))
testnode = TestNode(logger.info, CONFIG)
testnode.run()
TestNode(CONFIG).run()
......@@ -25,13 +25,13 @@
#
##############################################################################
import os
import sys
import json
import time
import shutil
import logging
from contextlib import contextmanager
from slapos.slap.slap import ConnectionError
from . import logger, log_formatter
from .ProcessManager import SubprocessError, ProcessManager, CancellationError
from subprocess import CalledProcessError
from .Updater import Updater
......@@ -47,12 +47,6 @@ MAX_TEMP_TIME = 0.01 # time in days we should keep temp files
PROFILE_PATH_KEY = 'profile_path'
class DummyLogger(object):
def __init__(self, func):
for name in ('trace', 'debug', 'info', 'warn', 'warning', 'error',
'critical', 'fatal'):
setattr(self, name, func)
test_type_registry = {
'UnitTest': UnitTestRunner,
'ScalabilityTest': ScalabilityTestRunner,
......@@ -60,12 +54,10 @@ test_type_registry = {
class TestNode(object):
def __init__(self, log, config, max_log_time=MAX_LOG_TIME,
def __init__(self, config, max_log_time=MAX_LOG_TIME,
max_temp_time=MAX_TEMP_TIME):
self.testnode_log = log
self.log = log
self.config = config or {}
self.process_manager = ProcessManager(log)
self.process_manager = ProcessManager()
self.working_directory = config['working_directory']
self.node_test_suite_dict = {}
self.file_handler = None
......@@ -80,7 +72,7 @@ class TestNode(object):
for reference in reference_set:
fpath = os.path.join(self.working_directory, reference)
self.node_test_suite_dict.pop(reference, None)
self.log("testnode.purgeOldTestSuite, DELETING : %r", fpath)
logger.info("testnode.purgeOldTestSuite, DELETING : %r", fpath)
if os.path.isdir(fpath):
shutil.rmtree(fpath)
else:
......@@ -171,7 +163,7 @@ develop = false
def getAndUpdateFullRevisionList(self, node_test_suite):
full_revision_list = []
config = self.config
log = self.log
revision_list = []
try:
for vcs_repository in node_test_suite.vcs_repository_list:
repository_path = vcs_repository['repository_path']
......@@ -179,56 +171,41 @@ develop = false
branch = vcs_repository.get('branch')
# Make sure we have local repository
updater = Updater(repository_path, git_binary=config['git_binary'],
branch=branch, log=log, process_manager=self.process_manager,
branch=branch, process_manager=self.process_manager,
working_directory=node_test_suite.working_directory,
url=vcs_repository["url"])
updater.checkout()
revision_list.append((repository_id, updater.getRevision()))
except SubprocessError:
log("Error while getting repository, ignoring this test suite",
logger.warning("Error while getting repository, ignoring this test suite",
exc_info=1)
return False
node_test_suite.revision_list = revision_list
return True
def registerSuiteLog(self, test_result, node_test_suite):
"""
Create a log dedicated for the test suite,
and register the url to master node.
"""
@contextmanager
def suiteLog(self, node_test_suite):
suite_log_path, folder_id = node_test_suite.createSuiteLog()
self._initializeSuiteLog(suite_log_path)
# TODO make the path into url
test_result.reportStatus('LOG url', "%s/%s" % (self.config.get('httpd_url'),
folder_id), '')
self.log("going to switch to log %r", suite_log_path)
self.process_manager.log = self.log = self.suite_log
return suite_log_path
def _initializeSuiteLog(self, suite_log_path):
# remove previous handlers
logger = logging.getLogger('testsuite')
if self.file_handler is not None:
logger.removeHandler(self.file_handler)
# and replace it with new handler
logger_format = '%(asctime)s %(name)-13s: %(levelname)-8s %(message)s'
formatter = logging.Formatter(logger_format)
logging.basicConfig(level=logging.INFO, format=logger_format)
self.file_handler = logging.FileHandler(filename=suite_log_path)
self.file_handler.setFormatter(formatter)
logger.addHandler(self.file_handler)
logger.info('Activated logfile %r output', suite_log_path)
self.suite_log = logger.info
handler = logging.FileHandler(filename=suite_log_path)
handler.setFormatter(log_formatter)
logger.info('Suite logfile: %s', suite_log_path)
try:
logger.propagate = False
logger.addHandler(handler)
yield folder_id
finally:
logger.propagate = True
logger.removeHandler(handler)
def checkRevision(self, test_result, node_test_suite):
config = self.config
log = self.log
if log is None:
log = self.log
if node_test_suite.revision != test_result.revision:
log('Disagreement on tested revision, checking out: %r' % (
(node_test_suite.revision,test_result.revision),))
for i, repository_revision in enumerate(test_result.revision.split(',')):
if node_test_suite.revision == test_result.revision:
return
logger.info('Disagreement on tested revision, checking out: %r != %r',
node_test_suite.revision, test_result.revision)
updater_kw = dict(git_binary=self.config['git_binary'],
process_manager=self.process_manager)
revision_list = []
for i, revision in enumerate(test_result.revision.split(',')):
vcs_repository = node_test_suite.vcs_repository_list[i]
repository_path = vcs_repository['repository_path']
revision = repository_revision.rsplit('-', 1)[1]
......@@ -250,7 +227,7 @@ develop = false
folder_path = os.path.join(log_directory, log_folder)
if os.path.isdir(folder_path):
if os.stat(folder_path).st_mtime < prune_time:
self.log("deleting log directory %r", folder_path)
logger.debug("deleting log directory %r", folder_path)
shutil.rmtree(folder_path)
def _cleanupTemporaryFiles(self):
......@@ -268,22 +245,21 @@ develop = false
try:
stat = os.stat(folder_path)
if stat.st_uid == user_id and stat.st_mtime < prune_time:
self.log("deleting temp directory %r", folder_path)
logger.debug("deleting temp directory %r", folder_path)
if os.path.isdir(folder_path):
shutil.rmtree(folder_path)
else:
os.remove(folder_path)
except OSError:
self.log("_cleanupTemporaryFiles exception", exc_info=sys.exc_info())
logger.warning("_cleanupTemporaryFiles exception", exc_info=1)
def cleanUp(self):
self.log('Testnode.cleanUp')
logger.debug('Testnode.cleanUp')
self.process_manager.killPreviousRun()
self._cleanupLog()
self._cleanupTemporaryFiles()
def run(self):
log = self.log
config = self.config
portal_url = config['test_suite_master_url']
test_node_slapos = SlapOSInstance(config['slapos_directory'])
......@@ -292,11 +268,10 @@ develop = false
test_result = None
try:
node_test_suite = None
self.log = self.process_manager.log = self.testnode_log
self.cleanUp()
begin = time.time()
taskdistributor = taskdistribution.TaskDistributor(
portal_url, logger=DummyLogger(log))
portal_url, logger=logger)
self.test_suite_portal = taskdistributor # XXX ScalabilityTest
node_configuration = taskdistributor.subscribeNode(
node_title=config['test_node_title'],
......@@ -308,7 +283,7 @@ develop = false
'process_timeout' in node_configuration \
and node_configuration['process_timeout'] is not None:
process_timeout = node_configuration['process_timeout']
log('Received and using process timeout from master: %i',
logger.info('Received and using process timeout from master: %i',
process_timeout)
self.process_manager.max_timeout = process_timeout
test_suite_data = taskdistributor.startTestSuite(
......@@ -318,22 +293,23 @@ develop = false
# Backward compatiblity
test_suite_data = json.loads(test_suite_data)
test_suite_data = deunicodeData(test_suite_data)
log("Got following test suite data from master : %r",
logger.info("Got following test suite data from master : %r",
test_suite_data)
try:
my_test_type = taskdistributor.getTestType()
except Exception:
log("testnode, error during requesting getTestType() method"
" from the distributor.")
logger.warning("testnode, error during requesting getTestType()"
" method from the distributor.")
raise
# Select runner according to the test type
try:
runner_class = test_type_registry[my_test_type]
except KeyError:
log("testnode, Runner type %s not implemented.", my_test_type)
logger.warning("testnode, Runner type %s not implemented.",
my_test_type)
raise NotImplementedError
runner = runner_class(self)
log("Type of current test is %s", my_test_type)
logger.info("Type of current test is %s", my_test_type)
# master testnode gets test_suites, slaves get nothing
runner.prepareSlapOSForTestNode(test_node_slapos)
# Clean-up test suites
......@@ -356,14 +332,18 @@ develop = false
config['test_node_title'], False,
node_test_suite.test_suite_title,
node_test_suite.project_title)
log("testnode, test_result : %r", test_result)
if test_result is not None:
self.registerSuiteLog(test_result, node_test_suite)
logger.info("testnode, test_result : %r", test_result)
if test_result is None:
self.cleanUp() # XXX not a good place to do that
continue
with self.suiteLog(node_test_suite) as suite_log_folder_name:
test_result.reportStatus('LOG url', "%s/%s" % (
config.get('httpd_url'), suite_log_folder_name), '')
self.checkRevision(test_result,node_test_suite)
node_test_suite.edit(test_result=test_result)
# get cluster configuration for this test suite, this is needed to
# know slapos parameters to user for creating instances
log("Getting configuration from test suite %s",
logger.info("Getting configuration from test suite %s",
node_test_suite.test_suite_title)
generated_config = taskdistributor.generateConfiguration(
node_test_suite.test_suite_title)
......@@ -398,16 +378,15 @@ develop = false
test_result.reportFailure(
stdout=error_message
)
self.log(error_message)
logger.error(error_message)
raise ValueError(error_message)
else:
raise NotImplementedError
# break the loop to get latest priorities from master
break
self.cleanUp()
except (SubprocessError, CalledProcessError, ConnectionError) as e:
log("", exc_info=1)
logger.exception("")
if test_result is not None:
status_dict = getattr(e, "status_dict", None) or {
'stderr': "%s: %s" % (e.__class__.__name__, e)}
......@@ -419,7 +398,7 @@ develop = false
continue
except ValueError as e:
# This could at least happens if runTestSuite is not found
log("ValueError : %r" % (e,), exc_info=sys.exc_info())
logger.exception("")
if node_test_suite is not None:
node_test_suite.retry_software_count += 1
if test_result is not None:
......@@ -427,24 +406,27 @@ develop = false
command='', stdout='',
stderr="ValueError was raised : %s" % (e,),
)
except CancellationError, e:
log("CancellationError", exc_info=sys.exc_info())
except CancellationError:
logger.exception("")
self.process_manager.under_cancellation = False
node_test_suite.retry = True
continue
self.cleanUp()
sleep_time = 120 - (time.time() - begin)
if sleep_time > 0:
log("End of processing, going to sleep %s", sleep_time)
logger.info("End of processing, going to sleep %s", sleep_time)
time.sleep(sleep_time)
except Exception as e:
log("Exception in error handling : %r" % (e,), exc_info=sys.exc_info())
except Exception:
logger.exception("")
except:
logger.exception("")
raise
finally:
if 'tb' in locals():
del tb
# Nice way to kill *everything* generated by run process -- process
# groups working only in POSIX compilant systems
# Exceptions are swallowed during cleanup phase
log("GENERAL EXCEPTION, QUITING")
logger.info("GENERAL EXCEPTION, QUITING")
self.cleanUp()
log("GENERAL EXCEPTION, QUITING, cleanup finished")
logger.info("GENERAL EXCEPTION, QUITING, cleanup finished")
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment