Commit 518df332 authored by Julien Muchembled's avatar Julien Muchembled Committed by Klaus Wölfel

testnode: some code clean up

A lot was found with pylint.
parent bb63c34b
......@@ -235,7 +235,7 @@ class ERP5TestNode(TestCase):
node_test_suite.custom_profile_path)
profile = open(node_test_suite.custom_profile_path, 'r')
if my_test_type=='UnitTest':
expected_profile = """
expected_profile = """\
[buildout]
extends = %(temp_dir)s/testnode/foo/rep0/software.cfg
......@@ -254,7 +254,7 @@ develop = false
else:
revision1 = "azerty"
revision2 = "qwerty"
expected_profile = """
expected_profile = """\
[buildout]
extends = %(temp_dir)s/testnode/foo/rep0/software.cfg
......@@ -818,17 +818,17 @@ develop = false
test_node.node_test_suite_dict
rand_part_set = set()
self.assertEquals(2, len(test_node.node_test_suite_dict))
assert(test_node.suite_log is not None)
assert(isinstance(test_node.suite_log, types.MethodType))
self.assertIsNot(test_node.suite_log, None)
self.assertTrue(isinstance(test_node.suite_log, types.MethodType))
for ref, suite in test_node.node_test_suite_dict.items():
self.assertTrue('var/log/testnode/%s' % suite.reference in \
suite.suite_log_path,
"Incorrect suite log path : %r" % suite.suite_log_path)
assert(suite.suite_log_path.endswith('suite.log'))
self.assertTrue(suite.suite_log_path.endswith('suite.log'))
m = re.match('.*\-(.*)\/suite.log', suite.suite_log_path)
rand_part = m.groups()[0]
assert(len(rand_part) == 32)
assert(rand_part not in rand_part_set)
self.assertEqual(len(rand_part), 32)
self.assertNotIn(rand_part, rand_part_set)
rand_part_set.add(rand_part)
suite_log = open(suite.suite_log_path, 'r')
self.assertEquals(1, len([x for x in suite_log.readlines() \
......
......@@ -24,23 +24,13 @@
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
from datetime import datetime,timedelta
import errno
import os
import subprocess
import sys
import time
import glob
import SlapOSControler
import json
import time
import shutil
import logging
import string
import random
from ProcessManager import SubprocessError, ProcessManager, CancellationError
from subprocess import CalledProcessError
from Updater import Updater
from erp5.util import taskdistribution
from .Utils import createFolder
class SlapOSInstance(object):
"""
......@@ -57,7 +47,7 @@ class SlapOSInstance(object):
def _checkData(self):
pass
class NodeTestSuite(SlapOSInstance):
"""
......@@ -67,22 +57,19 @@ class NodeTestSuite(SlapOSInstance):
self.reference = reference
self.cluster_configuration = {}
def edit(self, **kw):
super(NodeTestSuite, self).edit(**kw)
def _checkData(self):
if getattr(self, "working_directory", None) is not None:
if not(self.working_directory.endswith(os.path.sep + self.reference)):
self.working_directory = os.path.join(self.working_directory,
self.reference)
SlapOSControler.createFolder(self.working_directory)
createFolder(self.working_directory)
self.test_suite_directory = os.path.join(
self.working_directory, "test_suite")
self.custom_profile_path = os.path.join(self.working_directory,
'software.cfg')
if getattr(self, "vcs_repository_list", None) is not None:
for vcs_repository in self.vcs_repository_list:
buildout_section_id = vcs_repository.get('buildout_section_id', None)
buildout_section_id = vcs_repository.get('buildout_section_id')
repository_id = buildout_section_id or \
vcs_repository.get('url').split('/')[-1].split('.')[0]
repository_path = os.path.join(self.working_directory,repository_id)
......@@ -92,18 +79,20 @@ class NodeTestSuite(SlapOSInstance):
def createSuiteLog(self):
# /srv/slapgrid/slappartXX/srv/var/log/testnode/az-mlksjfmlk234Sljssdflkj23KSdfslj/suite.log
alphabets = string.digits + string.letters
rand_part = ''.join(random.choice(alphabets) for i in xrange(32))
random_suite_folder_id = '%s-%s' % (self.reference, rand_part)
suite_log_directory = os.path.join(self.log_directory,
random_suite_folder_id)
SlapOSControler.createFolders(suite_log_directory)
while 1:
log_folder_name = '%s-%s' % (self.reference,
''.join(random.choice(alphabets) for i in xrange(32)))
log_folder_path = os.path.join(self.log_directory, log_folder_name)
try:
os.makedirs(log_folder_path)
except OSError, e:
if e.errno != errno.EEXIST:
raise
else:
break
# XXX copy the whole content of the log viewer app
for fname in glob.glob(os.path.join(os.path.dirname(__file__), 'js-logtail', '*')):
shutil.copy(fname, suite_log_directory)
self.suite_log_path = os.path.join(suite_log_directory,
'suite.log')
return self.getSuiteLogPath(), random_suite_folder_id
def getSuiteLogPath(self):
return getattr(self,"suite_log_path", None)
shutil.copy(fname, log_folder_path)
self.suite_log_path = os.path.join(log_folder_path, 'suite.log')
return self.suite_log_path, log_folder_name
......@@ -131,14 +131,14 @@ class ProcessManager(object):
stdin = file(os.devnull)
def __init__(self, log, *args, **kw):
def __init__(self, log, max_timeout=MAX_TIMEOUT):
self.log = log
self.process_pid_set = set()
signal.signal(signal.SIGTERM, self.sigterm_handler)
self.under_cancellation = False
self.p = None
self.result = None
self.max_timeout = kw.get("max_timeout") or MAX_TIMEOUT
self.max_timeout = max_timeout
self.timer_set = set()
def spawn(self, *args, **kw):
......@@ -186,10 +186,9 @@ class ProcessManager(object):
raise SubprocessError(result)
return result
def getSupportedParameterSet(self, program_path ,parameter_list):
help_string = self.spawn(*[program_path,'--help'])['stdout']
help_words = set(help_string.split())
return help_words.intersection(set(parameter_list))
def getSupportedParameterList(self, program_path):
return re.findall(r'^ (--\w+)',
self.spawn(program_path, '--help')['stdout'], re.M)
def killall(self, name):
"""
......@@ -212,13 +211,15 @@ class ProcessManager(object):
continue
except (psutil.AccessDenied, psutil.NoSuchProcess):
continue
self.log('ProcesssManager, killall on %s having pid %s' % (name, process.pid))
self.log('ProcesssManager, killall on %s having pid %s',
name, process.pid)
to_kill_list.append(process.pid)
for pid in to_kill_list:
killCommand(pid, self.log)
def killPreviousRun(self, cancellation=False):
self.log('ProcessManager killPreviousRun, going to kill %r' % (self.process_pid_set,))
self.log('ProcessManager killPreviousRun, going to kill %r',
self.process_pid_set)
if cancellation:
self.under_cancellation = True
for timer in self.timer_set:
......@@ -227,12 +228,13 @@ class ProcessManager(object):
killCommand(pgpid, self.log)
try:
if os.path.exists(self.supervisord_pid_file):
supervisor_pid = int(open(self.supervisord_pid_file).read().strip())
self.log('ProcessManager killPreviousRun, going to kill supervisor with pid %r' % supervisor_pid)
with open(self.supervisord_pid_file) as f:
supervisor_pid = int(f.read().strip())
self.log('ProcessManager killPreviousRun, going to kill supervisor with pid %r',
supervisor_pid)
os.kill(supervisor_pid, signal.SIGTERM)
except:
except Exception:
self.log('ProcessManager killPreviousRun, exception when killing supervisor')
pass
self.process_pid_set.clear()
def sigterm_handler(self, signal, frame):
......
......@@ -30,40 +30,14 @@ import slapos.slap
import subprocess
import time
import xml_marshaller
import shutil
import sys
import glob
import argparse
import json
from slapos import client
from .Utils import createFolder
MAX_PARTIONS = 10
MAX_PARTITIONS = 10
MAX_SR_RETRIES = 3
def createFolder(folder, clean=False):
if clean and os.path.exists(folder):
shutil.rmtree(folder)
if not(os.path.exists(folder)):
os.mkdir(folder)
def createFolders(folder):
if not(os.path.exists(folder)):
os.makedirs(folder)
def isDir(folder):
return os.path.isdir(folder)
def createFile(path, mode, content):
f = open(path, mode)
if os.path.exists(path):
f.write(content)
f.close()
else:
# error
pass
class SlapOSControler(object):
def __init__(self, working_directory, config, log):
......@@ -91,9 +65,12 @@ class SlapOSControler(object):
slapos_url,
slapos_account_certificate_path,
slapos_account_key_path)
createFile(slapos_account_key_path, "w", key)
createFile(slapos_account_certificate_path, "w", certificate)
createFile(configuration_file_path, "w", configuration_file_value)
with open(slapos_account_key_path, "w") as f:
f.write(key)
with open(slapos_account_certificate_path, "w") as f:
f.write(certificate)
with open(configuration_file_path, "w") as f:
f.write(configuration_file_value)
self.configuration_file_path = configuration_file_path
return slapos_account_key_path, slapos_account_certificate_path, configuration_file_path
......@@ -115,26 +92,17 @@ class SlapOSControler(object):
try:
local = client.init(config)
local['supply'](software_url, computer_guid=computer_id, state=state)
self.log('SlapOSControler : supply %s %s %s' %(software_url, computer_id, state))
except:
self.log("SlapOSControler.supply, \
exception in registerOpenOrder", exc_info=sys.exc_info())
self.log('SlapOSControler : supply %s %s %s', software_url, computer_id, state)
except Exception:
self.log("SlapOSControler.supply", exc_info=sys.exc_info())
raise ValueError("Unable to supply (or remove)")
else:
raise ValueError("Configuration file not found.")
def destroy(self, software_url, computer_id):
"""
Request Deletetion of a software release on a specific node
Ex :
my_controler.destroy('kvm.cfg', 'COMP-726')
"""
self.supply(self, software_url, computer_id, state="destroyed")
def getInstanceRequestedState(self, reference):
try:
return self.instance_config[reference]['requested_state']
except:
except Exception:
raise ValueError("Instance '%s' not exist" %self.instance_config[reference])
def request(self, reference, software_url, software_type=None,
......@@ -182,11 +150,11 @@ class SlapOSControler(object):
self.instance_config[reference]['partition'] = partition
if state == 'destroyed':
del self.instance_config[reference]
if state == 'started':
self.log('Instance started with configuration: %s' %str(software_configuration))
except:
self.log("SlapOSControler.request, \
exception in registerOpenOrder", exc_info=sys.exc_info())
elif state == 'started':
self.log('Instance started with configuration: %s',
software_configuration)
except Exception:
self.log("SlapOSControler.request", exc_info=sys.exc_info())
raise ValueError("Unable to do this request")
else:
raise ValueError("Configuration file not found.")
......@@ -204,31 +172,30 @@ class SlapOSControler(object):
self.log('SlapOSControler : delete instance')
try:
self._requestSpecificState(reference, 'destroyed')
except:
raise ValueError("Can't delete instance '%s' (instance may not been created?)" %reference)
except Exception:
raise ValueError("Can't delete instance %r (instance not created?)" % reference)
def stopInstance(self, reference):
self.log('SlapOSControler : stop instance')
try:
self._requestSpecificState(reference, 'stopped')
except:
raise ValueError("Can't stop instance '%s' (instance may not been created?)" %reference)
except Exception:
raise ValueError("Can't stop instance %r (instance not created?)" % reference)
def startInstance(self, reference):
self.log('SlapOSControler : start instance')
try:
self._requestSpecificState(reference, 'started')
except:
raise ValueError("Can't start instance '%s' (instance may not been created?)" %reference)
except Exception:
raise ValueError("Can't start instance %r (instance not created?)" % reference)
def updateInstanceXML(self, reference, software_configuration):
"""
Update the XML configuration of an instance
# Request same instance with different parameters.
"""
self.log('SlapOSControler : updateInstanceXML')
self.log('SlapOSControler : updateInstanceXML will request same'
'instance with new XML configuration...')
' instance with new XML configuration...')
try:
self.request(reference,
......@@ -238,32 +205,28 @@ class SlapOSControler(object):
self.instance_config[reference]['computer_guid'],
state='started'
)
except:
except Exception:
raise ValueError("Can't update instance '%s' (may not exist?)" %reference)
def _resetSoftware(self):
self.log('SlapOSControler : GOING TO RESET ALL SOFTWARE : %r' %
(self.software_root,))
if os.path.exists(self.software_root):
shutil.rmtree(self.software_root)
os.mkdir(self.software_root)
os.chmod(self.software_root, 0750)
self.log('SlapOSControler : GOING TO RESET ALL SOFTWARE : %r',
self.software_root)
createFolder(self.software_root, True)
def initializeSlapOSControler(self, slapproxy_log=None, process_manager=None,
reset_software=False, software_path_list=None):
self.process_manager = process_manager
self.software_path_list = software_path_list
self.log('SlapOSControler, initialize, reset_software: %r' % reset_software)
self.log('SlapOSControler, initialize, reset_software: %r', reset_software)
config = self.config
slapos_config_dict = self.config.copy()
slapos_config_dict.update(software_root=self.software_root,
instance_root=self.instance_root,
proxy_database=self.proxy_database)
open(self.slapos_config, 'w').write(pkg_resources.resource_string(
with open(self.slapos_config, 'w') as f:
f.write(pkg_resources.resource_string(
'erp5.util.testnode', 'template/slapos.cfg.in') %
slapos_config_dict)
createFolder(self.software_root)
createFolder(self.instance_root)
# By erasing everything, we make sure that we are able to "update"
# existing profiles. This is quite dirty way to do updates...
if os.path.exists(self.proxy_database):
......@@ -282,32 +245,29 @@ class SlapOSControler(object):
# connections
time.sleep(10)
try:
slap = slapos.slap.slap()
self.slap = slap
self.slap.initializeConnection(config['master_url'])
slap = self.slap = slapos.slap.slap()
slap.initializeConnection(config['master_url'])
# register software profile
for path in self.software_path_list:
slap.registerSupply().supply(
path,
computer_guid=config['computer_id'])
computer = slap.registerComputer(config['computer_id'])
except:
self.log("SlapOSControler.initializeSlapOSControler, \
exception in registerSupply", exc_info=sys.exc_info())
except Exception:
self.log("SlapOSControler.initializeSlapOSControler",
exc_info=sys.exc_info())
raise ValueError("Unable to registerSupply")
# Reset all previously generated software if needed
if reset_software:
self._resetSoftware()
else:
createFolder(self.software_root)
instance_root = self.instance_root
if os.path.exists(instance_root):
# delete old paritions which may exists in order to not get its data
# (ex. MySQL db content) from previous testnode's runs
# In order to be able to change partition naming scheme, do this at
# instance_root level (such change happened already, causing problems).
shutil.rmtree(instance_root)
if not(os.path.exists(instance_root)):
os.mkdir(instance_root)
for i in range(0, MAX_PARTIONS):
# Delete any existing partition in order to not get its data (ex.
# MySQL DB content) from previous runs. To support changes of partition
# naming scheme (which already happened), do this at instance_root level.
createFolder(instance_root, True)
for i in xrange(MAX_PARTITIONS):
# create partition and configure computer
# XXX: at the moment all partitions do share same virtual interface address
# this is not a problem as usually all services are on different ports
......@@ -336,17 +296,17 @@ class SlapOSControler(object):
def runSoftwareRelease(self, config, environment, **kw):
self.log("SlapOSControler.runSoftwareRelease")
cpu_count = os.sysconf("SC_NPROCESSORS_ONLN")
os.putenv('MAKEFLAGS', '-j%s' % cpu_count)
os.putenv('NPY_NUM_BUILD_JOBS', '%s' % cpu_count)
os.putenv('BUNDLE_JOBS', '%s' % cpu_count)
cpu_count = str(os.sysconf("SC_NPROCESSORS_ONLN"))
os.environ['MAKEFLAGS'] = '-j' + cpu_count
os.environ['NPY_NUM_BUILD_JOBS'] = cpu_count
os.environ['BUNDLE_JOBS'] = cpu_count
os.environ['PATH'] = environment['PATH']
# a SR may fail for number of reasons (incl. network failures)
# so be tolerant and run it a few times before giving up
for runs in range(0, MAX_SR_RETRIES):
for _ in xrange(MAX_SR_RETRIES):
status_dict = self.spawn(config['slapos_binary'],
'node', 'software', '--all',
'--pidfile', '%s/software.pid' % self.software_root,
'--pidfile', os.path.join(self.software_root, 'slapos-node.pid'),
'--cfg', self.slapos_config, raise_error_if_fail=False,
log_prefix='slapgrid_sr', get_output=False)
if status_dict['status_code'] == 0:
......@@ -355,32 +315,32 @@ class SlapOSControler(object):
def runComputerPartition(self, config, environment,
stdout=None, stderr=None, cluster_configuration=None, **kw):
self.log("SlapOSControler.runComputerPartition with cluster_config: %r" % (cluster_configuration,))
self.log("SlapOSControler.runComputerPartition with cluster_config: %r",
cluster_configuration)
for path in self.software_path_list:
try:
self.slap.registerOpenOrder().request(path,
partition_reference='testing partition %s' % \
self.software_path_list.index(path),
partition_parameter_kw=cluster_configuration)
except:
self.log("SlapOSControler.runComputerPartition, \
exception in registerOpenOrder", exc_info=sys.exc_info())
except Exception:
self.log("SlapOSControler.runComputerPartition", exc_info=sys.exc_info())
raise ValueError("Unable to registerOpenOrder")
# try to run for all partitions as one partition may in theory request another one
# this not always is required but curently no way to know how "tree" of partitions
# may "expand"
sleep_time = 0
for runs in range(0, MAX_PARTIONS):
for _ in xrange(MAX_PARTITIONS):
status_dict = self.spawn(config['slapos_binary'], 'node', 'instance',
'--pidfile', '%s/instance.pid' % self.software_root,
'--pidfile', os.path.join(self.instance_root, 'slapos-node.pid'),
'--cfg', self.slapos_config, raise_error_if_fail=False,
log_prefix='slapgrid_cp', get_output=False)
self.log('slapgrid_cp status_dict : %r' % (status_dict,))
if status_dict['status_code'] in (0,):
self.log('slapgrid_cp status_dict : %r', status_dict)
if not status_dict['status_code']:
break
# some hack to handle promise issues (should be only one of the two
# codes, but depending on slapos versions, we have inconsistent status
if status_dict['status_code'] in (1,2):
status_dict['status_code'] = 0
else:
# some hack to handle promise issues (should be only one of the two
# codes, but depending on slapos versions, we have inconsistent status
if status_dict['status_code'] in (1,2):
status_dict['status_code'] = 0
return status_dict
import datetime
import json
import sys
import traceback
import time
#import feedparser
from functools import wraps
from uritemplate import expand
import slapos.slap
from slapos.slap import SoftwareProductCollection
from slapos.slap.slap import ConnectionError
from requests.exceptions import HTTPError
from erp5.util.taskdistribution import SAFE_RPC_EXCEPTION_LIST
from ..taskdistribution import SAFE_RPC_EXCEPTION_LIST
# max time to instance changing state: 2 hour
MAX_INSTANCE_TIME = 60*60*2
......@@ -41,36 +39,29 @@ TESTER_STATE_INSTANCE_UNINSTALLED = "TESTER_STATE_INSTANCE_UNINSTALLED"
# Simple decorator to prevent raise due small
# network failures.
def retryOnNetworkFailure(func):
def wrapper(*args, **kwargs):
def retryOnNetworkFailure(func,
_except_list = SAFE_RPC_EXCEPTION_LIST + (
HTTPError, slapos.slap.ConnectionError),
):
def wrapper(*args, **kw):
retry_time = 64
while True:
try:
return func(*args, **kwargs)
except SAFE_RPC_EXCEPTION_LIST, e:
print 'Network failure: %s , %s' % (sys.exc_info(), e)
except HTTPError, e:
print 'Network failure: %s , %s' % (sys.exc_info(), e)
except ConnectionError, e:
print 'Network failure: %s , %s' % (sys.exc_info(), e)
except slapos.slap.ConnectionError, e:
print 'Network failure: %s , %s' % (sys.exc_info(), e)
print 'Retry method %s in %i seconds' % (func, retry_time)
return func(*args, **kw)
except _except_list:
traceback.print_exc()
print 'Network failure. Retry method %s in %i seconds' % (func, retry_time)
time.sleep(retry_time)
retry_time = min(retry_time*1.5, 640)
wrapper.__name__ = func.__name__
wrapper.__doc__ = func.__doc__
return wrapper
return wraps(func)(wrapper)
class SlapOSMasterCommunicator(object):
latest_state = None
def __init__(self, slap, slap_supply, slap_order, url, logger):
self._logger = logger
self.slap = slap
self.slap_order = slap_order
......@@ -102,8 +93,7 @@ class SlapOSMasterCommunicator(object):
if instance_title is not None:
self.name = instance_title
if request_kw is not None:
if isinstance(request_kw, str) or \
isinstance(request_kw, unicode):
if isinstance(request_kw, basestring):
self.request_kw = json.loads(request_kw)
else:
self.request_kw = request_kw
......@@ -116,12 +106,11 @@ class SlapOSMasterCommunicator(object):
**self.request_kw)
def isInstanceRequested(self, instance_title):
hateoas = getattr(self.slap, '_hateoas_navigator', None)
hateoas = self._hateoas_navigator
return instance_title in hateoas.getHostingSubscriptionDict()
@retryOnNetworkFailure
def _hateoas_getComputer(self, reference):
root_document = self.hateoas_navigator.getRootDocument()
search_url = root_document["_links"]['raw_search']['href']
......@@ -147,7 +136,6 @@ class SlapOSMasterCommunicator(object):
@retryOnNetworkFailure
def getSoftwareInstallationList(self):
# XXX Move me to slap.py API
computer = self._hateoas_getComputer(self.computer_guid)
# Not a list ?
......@@ -191,7 +179,6 @@ class SlapOSMasterCommunicator(object):
@retryOnNetworkFailure
def getInstanceUrlList(self):
if self.hosting_subscription_url is None:
hosting_subscription_dict = self.hateoas_navigator._hateoas_getHostingSubscriptionDict()
for hs in hosting_subscription_dict:
......@@ -207,7 +194,6 @@ class SlapOSMasterCommunicator(object):
@retryOnNetworkFailure
def getNewsFromInstance(self, url):
result = self.hateoas_navigator.GET(url)
result = json.loads(result)
if result['_links'].get('action_object_slap', None) is None:
......@@ -221,7 +207,6 @@ class SlapOSMasterCommunicator(object):
@retryOnNetworkFailure
def getInformationFromInstance(self, url):
result = self.hateoas_navigator.GET(url)
result = json.loads(result)
if result['_links'].get('action_object_slap', None) is None:
......@@ -329,7 +314,7 @@ class SlapOSMasterCommunicator(object):
self._logger('Got an error requesting partition for '
'its state')
return INSTANCE_STATE_UNKNOWN
except:
except Exception:
self._logger("ERROR getting instance state")
return INSTANCE_STATE_UNKNOWN
......@@ -377,6 +362,7 @@ class SlapOSMasterCommunicator(object):
return {'error_message' : None}
class SlapOSTester(SlapOSMasterCommunicator):
def __init__(self,
name,
logger,
......@@ -387,7 +373,6 @@ class SlapOSTester(SlapOSMasterCommunicator):
computer_guid=None, # computer for supply if desired
request_kw=None
):
super(SlapOSTester, self).__init__(
slap, slap_supply, slap_order, url, logger)
......@@ -464,7 +449,6 @@ class SoftwareReleaseTester(SlapOSTester):
software_timeout=3600,
instance_timeout=3600,
):
super(SoftwareReleaseTester, self).__init__(
name, logger, slap, slap_order, slap_supply, url, computer_guid, request_kw)
......
......@@ -24,28 +24,22 @@
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
from datetime import datetime,timedelta
import os
import subprocess
import sys
import time
import glob
import SlapOSControler
import json
import time
import shutil
import logging
import string
import random
from ProcessManager import SubprocessError, ProcessManager, CancellationError
from subprocess import CalledProcessError
from NodeTestSuite import SlapOSInstance
from Updater import Updater
from Utils import dealShebang
from erp5.util import taskdistribution
from .ProcessManager import SubprocessError
from .SlapOSControler import SlapOSControler
from .Utils import createFolder
from slapos.grid.utils import md5digest
class UnitTestRunner():
def dealShebang(run_test_suite_path):
with open(run_test_suite_path) as f:
if f.read(2) == '#!':
return f.readline().split(None, 1)
return []
class UnitTestRunner(object):
def __init__(self, testnode):
self.testnode = testnode
......@@ -53,12 +47,11 @@ class UnitTestRunner():
"""
Create a SlapOSControler
"""
return SlapOSControler.SlapOSControler(
return SlapOSControler(
working_directory,
self.testnode.config,
self.testnode.log)
def _prepareSlapOS(self, working_directory, slapos_instance, log,
create_partition=1, software_path_list=None, **kw):
"""
......@@ -66,11 +59,11 @@ class UnitTestRunner():
"""
slapproxy_log = os.path.join(self.testnode.config['log_directory'],
'slapproxy.log')
log('Configured slapproxy log to %r' % slapproxy_log)
log('Configured slapproxy log to %r', slapproxy_log)
reset_software = slapos_instance.retry_software_count > 10
if reset_software:
slapos_instance.retry_software_count = 0
log('testnode, retry_software_count : %r' % \
log('testnode, retry_software_count : %r',
slapos_instance.retry_software_count)
# XXX Create a new controler because working_directory can be
......@@ -108,26 +101,22 @@ class UnitTestRunner():
"""
# report-url, report-project and suite-url are required to seleniumrunner
# instance. This is a hack which must be removed.
cluster_configuration = {}
config = self.testnode.config
cluster_configuration['report-url'] = config.get("report-url", "")
cluster_configuration['report-project'] = config.get("report-project", "")
cluster_configuration['suite-url'] = config.get("suite-url", "")
return self._prepareSlapOS(self.testnode.config['slapos_directory'],
return self._prepareSlapOS(config['slapos_directory'],
test_node_slapos, self.testnode.log, create_partition=0,
software_path_list=self.testnode.config.get("software_list"),
cluster_configuration=cluster_configuration
)
software_path_list=config.get("software_list"),
cluster_configuration={
'report-url': config.get("report-url", ""),
'report-project': config.get("report-project", ""),
'suite-url': config.get("suite-url", ""),
})
def prepareSlapOSForTestSuite(self, node_test_suite):
"""
Build softwares needed by testsuites
"""
log = self.testnode.log
if log is None:
log = self.testnode.log
return self._prepareSlapOS(node_test_suite.working_directory,
node_test_suite, log,
node_test_suite, self.testnode.log,
software_path_list=[node_test_suite.custom_profile_path],
cluster_configuration={'_': json.dumps(node_test_suite.cluster_configuration)})
......@@ -173,8 +162,7 @@ class UnitTestRunner():
# From this point, test runner becomes responsible for updating test
# result. We only do cleanup if the test runner itself is not able
# to run.
SlapOSControler.createFolder(node_test_suite.test_suite_directory,
clean=True)
createFolder(node_test_suite.test_suite_directory, clean=True)
self.testnode.process_manager.spawn(*invocation_list,
cwd=node_test_suite.test_suite_directory,
log_prefix='runTestSuite', get_output=False)
......
......@@ -28,11 +28,8 @@ import errno
import os
import re
import shutil
import subprocess
import sys
import threading
from ProcessManager import SubprocessError
from .ProcessManager import SubprocessError
SVN_UP_REV = re.compile(r'^(?:At|Updated to) revision (\d+).$')
SVN_CHANGED_REV = re.compile(r'^Last Changed Rev.*:\s*(\d+)', re.MULTILINE)
......@@ -44,9 +41,8 @@ SVN_TYPE = 'svn'
class Updater(object):
_git_cache = {}
stdin = file(os.devnull)
def __init__(self, repository_path, log, revision=None, git_binary=None,
def __init__(self, repository_path, log, revision=None, git_binary='git',
branch=None, realtime_output=True, process_manager=None, url=None,
working_directory=None):
self.log = log
......@@ -81,7 +77,7 @@ class Updater(object):
def deletePycFiles(self, path):
"""Delete *.pyc files so that deleted/moved files can not be imported"""
for path, dir_list, file_list in os.walk(path):
for path, _, file_list in os.walk(path):
for file in file_list:
if file[-4:] in ('.pyc', '.pyo'):
# allow several processes clean the same folder at the same time
......@@ -115,17 +111,17 @@ class Updater(object):
git_repository_path = os.path.join(self.getRepositoryPath(), '.git')
name = os.path.basename(os.path.normpath(self.getRepositoryPath()))
git_repository_link_path = os.path.join(self.getRepositoryPath(), '%s.git' %name)
self.log("checking link %s -> %s.."
%(git_repository_link_path,git_repository_path))
self.log("checking link %s -> %s..",
git_repository_link_path, git_repository_path)
if ( not os.path.lexists(git_repository_link_path) and \
not os.path.exists(git_repository_link_path) ):
try:
os.symlink(git_repository_path, git_repository_link_path)
self.log("link: %s -> %s created"
%(git_repository_link_path,git_repository_path))
except:
self.log("Cannot create link from %s -> %s"
%(git_repository_link_path,git_repository_path))
self.log("link: %s -> %s created",
git_repository_link_path, git_repository_path)
except OSError:
self.log("Cannot create link from %s -> %s",
git_repository_link_path, git_repository_path)
def _git_find_rev(self, ref):
try:
......@@ -152,7 +148,7 @@ class Updater(object):
raise NotImplementedError
def deleteRepository(self):
self.log("Wrong repository or wrong url, deleting repos %s" % \
self.log("Wrong repository or wrong url, deleting repos %s",
self.repository_path)
shutil.rmtree(self.repository_path)
......@@ -165,7 +161,7 @@ class Updater(object):
remote_url = self._git("config", "--get", "remote.origin.url")
if remote_url == self.url:
correct_url = True
except (SubprocessError,) as e:
except SubprocessError:
self.log("SubprocessError", exc_info=sys.exc_info())
if not(correct_url):
self.deleteRepository()
......
import sys
import json
import os
import shutil
import string
from random import choice
def createFolder(folder, clean=False):
if os.path.exists(folder):
if not clean:
return
shutil.rmtree(folder)
os.mkdir(folder)
def deunicodeData(data):
if isinstance(data, list):
new_data = []
for sub_data in data:
new_data.append(deunicodeData(sub_data))
elif isinstance(data, unicode):
new_data = data.encode('utf8')
elif isinstance(data, dict):
new_data = {}
for key, value in data.iteritems():
key = deunicodeData(key)
value = deunicodeData(value)
new_data[key] = value
else:
new_data = data
return new_data
def dealShebang(run_test_suite_path):
line = open(run_test_suite_path, 'r').readline()
invocation_list = []
if line[:2] == '#!':
invocation_list = line[2:].split()
return invocation_list
return map(deunicodeData, data)
if isinstance(data, unicode):
return data.encode('utf8')
if isinstance(data, dict):
return {deunicodeData(key): deunicodeData(value)
for key, value in data.iteritems()}
return data
......@@ -26,30 +26,24 @@
##############################################################################
import os
import sys
import time
import json
import time
import shutil
import logging
import Utils
from slapos.slap.slap import ConnectionError
import traceback
from ProcessManager import SubprocessError, ProcessManager, CancellationError
from .ProcessManager import SubprocessError, ProcessManager, CancellationError
from subprocess import CalledProcessError
from Updater import Updater
from NodeTestSuite import NodeTestSuite, SlapOSInstance
from ScalabilityTestRunner import ScalabilityTestRunner
from UnitTestRunner import UnitTestRunner
from erp5.util import taskdistribution
from .Updater import Updater
from .NodeTestSuite import NodeTestSuite, SlapOSInstance
from .ScalabilityTestRunner import ScalabilityTestRunner
from .UnitTestRunner import UnitTestRunner
from .Utils import deunicodeData
from .. import taskdistribution
DEFAULT_SLEEP_TIMEOUT = 120 # time in seconds to sleep
MAX_LOG_TIME = 15 # time in days we should keep logs that we can see through
# httd
MAX_TEMP_TIME = 0.01 # time in days we should keep temp files
supervisord_pid_file = None
PROFILE_PATH_KEY = 'profile_path'
......@@ -76,7 +70,6 @@ class TestNode(object):
def checkOldTestSuite(self,test_suite_data):
config = self.config
installed_reference_set = set(os.listdir(self.working_directory))
wished_reference_set = set([x['test_suite_reference'] for x in test_suite_data])
to_remove_reference_set = installed_reference_set.difference(
......@@ -84,11 +77,11 @@ class TestNode(object):
for y in to_remove_reference_set:
fpath = os.path.join(self.working_directory,y)
self.delNodeTestSuite(y)
self.log("testnode.checkOldTestSuite, DELETING : %r" % (fpath,))
self.log("testnode.checkOldTestSuite, DELETING : %r", fpath)
if os.path.isdir(fpath):
shutil.rmtree(fpath)
shutil.rmtree(fpath)
else:
os.remove(fpath)
os.remove(fpath)
def getNodeTestSuite(self, reference):
node_test_suite = self.node_test_suite_dict.get(reference)
......@@ -103,26 +96,21 @@ class TestNode(object):
return node_test_suite
def delNodeTestSuite(self, reference):
if self.node_test_suite_dict.has_key(reference):
self.node_test_suite_dict.pop(reference)
self.node_test_suite_dict.pop(reference, None)
def constructProfile(self, node_test_suite, test_type, use_relative_path=False):
config = self.config
profile_content = ''
assert len(node_test_suite.vcs_repository_list), "we must have at least one repository"
profile_path_count = 0
software_config_path = None
profile_content_list = []
for vcs_repository in node_test_suite.vcs_repository_list:
url = vcs_repository['url']
buildout_section_id = vcs_repository.get('buildout_section_id', None)
buildout_section_id = vcs_repository.get('buildout_section_id')
repository_path = vcs_repository['repository_path']
try:
profile_path = vcs_repository[PROFILE_PATH_KEY]
except KeyError:
pass
else:
profile_path_count += 1
if profile_path_count > 1:
if software_config_path is not None:
raise ValueError(PROFILE_PATH_KEY + ' defined more than once')
# Absolute path to relative path
......@@ -132,12 +120,6 @@ class TestNode(object):
node_test_suite.reference)
software_config_path = os.path.relpath(software_config_path, from_path)
profile_content_list.append("""
[buildout]
extends = %(software_config_path)s
""" % {'software_config_path': software_config_path})
# Construct sections
if not(buildout_section_id is None):
# Absolute path to relative
......@@ -175,14 +157,14 @@ develop = false
""" % {'buildout_section_id': buildout_section_id,
'repository_path' : repository_path,
'branch' : vcs_repository.get('branch','master')})
if not profile_path_count:
if software_config_path is None:
raise ValueError(PROFILE_PATH_KEY + ' not defined')
profile_content_list.sort()
# Write file
custom_profile = open(node_test_suite.custom_profile_path, 'w')
# sort to have buildout section first
profile_content_list.sort(key=lambda x: [x, ''][x.startswith('\n[buildout]')])
custom_profile.write(''.join(profile_content_list))
custom_profile.close()
with open(node_test_suite.custom_profile_path, 'w') as f:
f.write("[buildout]\nextends = %s\n%s" % (
software_config_path,
''.join(profile_content_list)))
sys.path.append(repository_path)
def getAndUpdateFullRevisionList(self, node_test_suite):
......@@ -200,13 +182,13 @@ develop = false
working_directory=node_test_suite.working_directory,
url=vcs_repository["url"])
updater.checkout()
revision = "-".join(updater.getRevision())
full_revision_list.append('%s=%s' % (repository_id, revision))
node_test_suite.revision = ','.join(full_revision_list)
except SubprocessError, e:
log("Error while getting repository, ignoring this test suite : %r" % (e,), exc_info=sys.exc_info())
full_revision_list = None
return full_revision_list
revision_list.append((repository_id, updater.getRevision()))
except SubprocessError:
log("Error while getting repository, ignoring this test suite",
exc_info=1)
return False
node_test_suite.revision_list = revision_list
return True
def registerSuiteLog(self, test_result, node_test_suite):
"""
......@@ -218,7 +200,7 @@ develop = false
# TODO make the path into url
test_result.reportStatus('LOG url', "%s/%s" % (self.config.get('httpd_url'),
folder_id), '')
self.log("going to switch to log %r" % suite_log_path)
self.log("going to switch to log %r", suite_log_path)
self.process_manager.log = self.log = self.getSuiteLog()
return suite_log_path
......@@ -237,7 +219,7 @@ develop = false
self.file_handler = logging.FileHandler(filename=suite_log_path)
self.file_handler.setFormatter(formatter)
logger.addHandler(self.file_handler)
logger.info('Activated logfile %r output' % suite_log_path)
logger.info('Activated logfile %r output', suite_log_path)
self.suite_log = logger.info
def checkRevision(self, test_result, node_test_suite):
......@@ -264,14 +246,13 @@ develop = false
node_test_suite.revision = test_result.revision
def _cleanupLog(self):
config = self.config
log_directory = self.config['log_directory']
now = time.time()
prune_time = time.time() - 86400 * self.max_log_time
for log_folder in os.listdir(log_directory):
folder_path = os.path.join(log_directory, log_folder)
if os.path.isdir(folder_path):
if (now - os.stat(folder_path).st_mtime)/86400 > self.max_log_time:
self.log("deleting log directory %r" % (folder_path,))
if os.stat(folder_path).st_mtime < prune_time:
self.log("deleting log directory %r", folder_path)
shutil.rmtree(folder_path)
def _cleanupTemporaryFiles(self):
......@@ -280,17 +261,16 @@ develop = false
missing disk space, remove old logs
"""
temp_directory = self.config["system_temp_folder"]
now = time.time()
user_id = os.geteuid()
prune_time = time.time() - 86400 * self.max_temp_time
for temp_folder in os.listdir(temp_directory):
folder_path = os.path.join(temp_directory, temp_folder)
if (temp_folder.startswith("tmp") or
temp_folder.startswith("buildout")):
try:
stat = os.stat(folder_path)
if stat.st_uid == user_id and \
(now - stat.st_mtime)/86400 > self.max_temp_time:
self.log("deleting temp directory %r" % (folder_path,))
if stat.st_uid == user_id and stat.st_mtime < prune_time:
self.log("deleting temp directory %r", folder_path)
if os.path.isdir(folder_path):
shutil.rmtree(folder_path)
else:
......@@ -298,9 +278,8 @@ develop = false
except OSError:
self.log("_cleanupTemporaryFiles exception", exc_info=sys.exc_info())
def cleanUp(self,test_result):
log = self.log
log('Testnode.cleanUp')
def cleanUp(self):
self.log('Testnode.cleanUp')
self.process_manager.killPreviousRun()
self._cleanupLog()
self._cleanupTemporaryFiles()
......@@ -308,19 +287,15 @@ develop = false
def run(self):
log = self.log
config = self.config
slapgrid = None
previous_revision_dict = {}
revision_dict = {}
test_result = None
test_node_slapos = SlapOSInstance()
test_node_slapos.edit(working_directory=self.config['slapos_directory'])
test_node_slapos.edit(working_directory=config['slapos_directory'])
try:
while True:
test_result = None
try:
node_test_suite = None
self.log = self.process_manager.log = self.testnode_log
self.cleanUp(None)
remote_test_result_needs_cleanup = False
self.cleanUp()
begin = time.time()
portal_url = config['test_suite_master_url']
self.taskdistribution = taskdistribution.TaskDistributor(
......@@ -328,25 +303,25 @@ develop = false
logger=DummyLogger(log))
node_configuration = self.taskdistribution.subscribeNode(node_title=config['test_node_title'],
computer_guid=config['computer_id'])
if type(node_configuration) == str:
if type(node_configuration) is str:
# Backward compatiblity
node_configuration = json.loads(node_configuration)
if node_configuration is not None and \
'process_timeout' in node_configuration \
and node_configuration['process_timeout'] is not None:
process_timeout = node_configuration['process_timeout']
log('Received and using process timeout from master: %i' % (
process_timeout))
log('Received and using process timeout from master: %i',
process_timeout)
self.process_manager.max_timeout = process_timeout
test_suite_data = self.taskdistribution.startTestSuite(
node_title=config['test_node_title'],
computer_guid=config['computer_id'])
if type(test_suite_data) == str:
if type(test_suite_data) is str:
# Backward compatiblity
test_suite_data = json.loads(test_suite_data)
test_suite_data = Utils.deunicodeData(test_suite_data)
log("Got following test suite data from master : %r" % \
(test_suite_data,))
test_suite_data = deunicodeData(test_suite_data)
log("Got following test suite data from master : %r",
test_suite_data)
try:
my_test_type = self.taskdistribution.getTestType()
except Exception:
......@@ -367,19 +342,18 @@ develop = false
# Clean-up test suites
self.checkOldTestSuite(test_suite_data)
for test_suite in test_suite_data:
remote_test_result_needs_cleanup = False
node_test_suite = self.getNodeTestSuite(
test_suite["test_suite_reference"])
node_test_suite.edit(
working_directory=self.config['working_directory'],
log_directory=self.config['log_directory'])
working_directory=config['working_directory'],
log_directory=config['log_directory'])
node_test_suite.edit(**test_suite)
if my_test_type == 'UnitTest':
runner = UnitTestRunner(node_test_suite)
elif my_test_type == 'ScalabilityTest':
runner = ScalabilityTestRunner(self)
runner = ScalabilityTestRunner(self)
else:
log("testnode, Runner type %s not implemented.", my_test_type)
raise NotImplementedError
......@@ -387,7 +361,6 @@ develop = false
# XXX: temporary hack to prevent empty test_suite
if not hasattr(node_test_suite, 'test_suite'):
node_test_suite.edit(test_suite='')
run_software = True
# kill processes from previous loop if any
self.process_manager.killPreviousRun()
revision_list = self.getAndUpdateFullRevisionList(node_test_suite)
......@@ -402,18 +375,17 @@ develop = false
config['test_node_title'], False,
node_test_suite.test_suite_title,
node_test_suite.project_title)
remote_test_result_needs_cleanup = True
log("testnode, test_result : %r" % (test_result, ))
log("testnode, test_result : %r", test_result)
if test_result is not None:
self.registerSuiteLog(test_result, node_test_suite)
self.checkRevision(test_result,node_test_suite)
node_test_suite.edit(test_result=test_result)
# get cluster configuration for this test suite, this is needed to
# know slapos parameters to user for creating instances
log("Getting configuration from test suite " + str(node_test_suite.test_suite_title))
log("Getting configuration from test suite %s", node_test_suite.test_suite_title)
generated_config = self.taskdistribution.generateConfiguration(node_test_suite.test_suite_title)
json_data = json.loads(generated_config)
cluster_configuration = Utils.deunicodeData(json_data['configuration_list'][0])
cluster_configuration = deunicodeData(json_data['configuration_list'][0])
node_test_suite.edit(cluster_configuration=cluster_configuration)
# Now prepare the installation of SlapOS and create instance
status_dict = runner.prepareSlapOSForTestSuite(node_test_suite)
......@@ -446,11 +418,12 @@ develop = false
# break the loop to get latest priorities from master
break
self.cleanUp(test_result)
self.cleanUp()
except (SubprocessError, CalledProcessError, ConnectionError) as e:
log("SubprocessError or ConnectionError : %r" % (e,), exc_info=sys.exc_info())
if remote_test_result_needs_cleanup:
status_dict = e.status_dict or {}
log("", exc_info=1)
if test_result is not None:
status_dict = getattr(e, "status_dict", None) or {
'stderr': "%s: %s" % (e.__class__.__name__, e)}
test_result.reportFailure(
command=status_dict.get('command'),
stdout=status_dict.get('stdout'),
......@@ -462,7 +435,7 @@ develop = false
log("ValueError : %r" % (e,), exc_info=sys.exc_info())
if node_test_suite is not None:
node_test_suite.retry_software_count += 1
if remote_test_result_needs_cleanup:
if test_result is not None:
test_result.reportFailure(
command='', stdout='',
stderr="ValueError was raised : %s" % (e,),
......@@ -472,16 +445,10 @@ develop = false
self.process_manager.under_cancellation = False
node_test_suite.retry = True
continue
except:
ex_type, ex, tb = sys.exc_info()
traceback.print_tb(tb)
log("erp5testnode exception", exc_info=sys.exc_info())
raise
now = time.time()
self.cleanUp(test_result)
if (now-begin) < 120:
sleep_time = 120 - (now-begin)
log("End of processing, going to sleep %s" % sleep_time)
self.cleanUp()
sleep_time = 120 - (time.time() - begin)
if sleep_time > 0:
log("End of processing, going to sleep %s", sleep_time)
time.sleep(sleep_time)
except Exception as e:
log("Exception in error handling : %r" % (e,), exc_info=sys.exc_info())
......@@ -492,5 +459,5 @@ develop = false
# groups working only in POSIX compilant systems
# Exceptions are swallowed during cleanup phase
log("GENERAL EXCEPTION, QUITING")
self.cleanUp(test_result)
self.cleanUp()
log("GENERAL EXCEPTION, QUITING, cleanup finished")
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment