diff --git a/erp5/tests/testERP5TestNode.py b/erp5/tests/testERP5TestNode.py index 103258166981fbeb7cf09d7bb01beacc26ecdd65..950fbcc5ba0ddcd3e5ccdba923c015923b30454b 100644 --- a/erp5/tests/testERP5TestNode.py +++ b/erp5/tests/testERP5TestNode.py @@ -237,7 +237,7 @@ class ERP5TestNode(TestCase): node_test_suite.custom_profile_path) profile = open(node_test_suite.custom_profile_path, 'r') if my_test_type=='UnitTest': - expected_profile = """ + expected_profile = """\ [buildout] extends = %(temp_dir)s/testnode/foo/rep0/software.cfg @@ -258,7 +258,7 @@ shared = true else: revision1 = "azerty" revision2 = "qwerty" - expected_profile = """ + expected_profile = """\ [buildout] extends = %(temp_dir)s/testnode/foo/rep0/software.cfg @@ -802,17 +802,17 @@ shared = true test_node.node_test_suite_dict rand_part_set = set() self.assertEquals(2, len(test_node.node_test_suite_dict)) - assert(test_node.suite_log is not None) - assert(isinstance(test_node.suite_log, types.MethodType)) + self.assertIsNot(test_node.suite_log, None) + self.assertTrue(isinstance(test_node.suite_log, types.MethodType)) for ref, suite in test_node.node_test_suite_dict.items(): self.assertTrue('var/log/testnode/%s' % suite.reference in \ suite.suite_log_path, "Incorrect suite log path : %r" % suite.suite_log_path) - assert(suite.suite_log_path.endswith('suite.log')) + self.assertTrue(suite.suite_log_path.endswith('suite.log')) m = re.match('.*\-(.*)\/suite.log', suite.suite_log_path) rand_part = m.groups()[0] - assert(len(rand_part) == 32) - assert(rand_part not in rand_part_set) + self.assertEqual(len(rand_part), 32) + self.assertNotIn(rand_part, rand_part_set) rand_part_set.add(rand_part) suite_log = open(suite.suite_log_path, 'r') self.assertEquals(1, len([x for x in suite_log.readlines() \ diff --git a/erp5/util/testnode/NodeTestSuite.py b/erp5/util/testnode/NodeTestSuite.py index fcb2f305fa678f9e7ac244441f012190c70be136..91ad41fcb9a0d6d4c2c145dc432aef7fc39efe60 100644 --- a/erp5/util/testnode/NodeTestSuite.py +++ b/erp5/util/testnode/NodeTestSuite.py @@ -24,23 +24,13 @@ # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. # ############################################################################## -from datetime import datetime,timedelta +import errno import os -import subprocess -import sys -import time import glob -import SlapOSControler -import json -import time import shutil -import logging import string import random -from ProcessManager import SubprocessError, ProcessManager, CancellationError -from subprocess import CalledProcessError -from Updater import Updater -from erp5.util import taskdistribution +from .Utils import createFolder class SlapOSInstance(object): """ @@ -57,7 +47,7 @@ class SlapOSInstance(object): def _checkData(self): pass - + class NodeTestSuite(SlapOSInstance): """ @@ -67,22 +57,19 @@ class NodeTestSuite(SlapOSInstance): self.reference = reference self.cluster_configuration = {} - def edit(self, **kw): - super(NodeTestSuite, self).edit(**kw) - def _checkData(self): if getattr(self, "working_directory", None) is not None: if not(self.working_directory.endswith(os.path.sep + self.reference)): self.working_directory = os.path.join(self.working_directory, self.reference) - SlapOSControler.createFolder(self.working_directory) + createFolder(self.working_directory) self.test_suite_directory = os.path.join( self.working_directory, "test_suite") self.custom_profile_path = os.path.join(self.working_directory, 'software.cfg') if getattr(self, "vcs_repository_list", None) is not None: for vcs_repository in self.vcs_repository_list: - buildout_section_id = vcs_repository.get('buildout_section_id', None) + buildout_section_id = vcs_repository.get('buildout_section_id') repository_id = buildout_section_id or \ vcs_repository.get('url').split('/')[-1].split('.')[0] repository_path = os.path.join(self.working_directory,repository_id) @@ -92,20 +79,22 @@ class NodeTestSuite(SlapOSInstance): def createSuiteLog(self): # /srv/slapgrid/slappartXX/srv/var/log/testnode/az-mlksjfmlk234Sljssdflkj23KSdfslj/suite.log alphabets = string.digits + string.letters - rand_part = ''.join(random.choice(alphabets) for i in xrange(32)) - random_suite_folder_id = '%s-%s' % (self.reference, rand_part) - suite_log_directory = os.path.join(self.log_directory, - random_suite_folder_id) - SlapOSControler.createFolders(suite_log_directory) + while 1: + log_folder_name = '%s-%s' % (self.reference, + ''.join(random.choice(alphabets) for i in xrange(32))) + log_folder_path = os.path.join(self.log_directory, log_folder_name) + try: + os.makedirs(log_folder_path) + except OSError, e: + if e.errno != errno.EEXIST: + raise + else: + break # XXX copy the whole content of the log viewer app for fname in glob.glob(os.path.join(os.path.dirname(__file__), 'js-logtail', '*')): - shutil.copy(fname, suite_log_directory) - self.suite_log_path = os.path.join(suite_log_directory, - 'suite.log') - return self.getSuiteLogPath(), random_suite_folder_id - - def getSuiteLogPath(self): - return getattr(self,"suite_log_path", None) + shutil.copy(fname, log_folder_path) + self.suite_log_path = os.path.join(log_folder_path, 'suite.log') + return self.suite_log_path, log_folder_name @property def revision(self): diff --git a/erp5/util/testnode/ProcessManager.py b/erp5/util/testnode/ProcessManager.py index 39712712e343a23320ab58178fe679b648752ccd..a566bd3a8d18d2715869e4b5b9045e7a74abc762 100644 --- a/erp5/util/testnode/ProcessManager.py +++ b/erp5/util/testnode/ProcessManager.py @@ -131,14 +131,14 @@ class ProcessManager(object): stdin = file(os.devnull) - def __init__(self, log, *args, **kw): + def __init__(self, log, max_timeout=MAX_TIMEOUT): self.log = log self.process_pid_set = set() signal.signal(signal.SIGTERM, self.sigterm_handler) self.under_cancellation = False self.p = None self.result = None - self.max_timeout = kw.get("max_timeout") or MAX_TIMEOUT + self.max_timeout = max_timeout self.timer_set = set() def spawn(self, *args, **kw): @@ -188,7 +188,7 @@ class ProcessManager(object): return result def getSupportedParameterList(self, program_path): - return re.findall('^ (--\w+)', + return re.findall(r'^ (--\w+)', self.spawn(program_path, '--help')['stdout'], re.M) def killall(self, name): @@ -212,13 +212,15 @@ class ProcessManager(object): continue except (psutil.AccessDenied, psutil.NoSuchProcess): continue - self.log('ProcesssManager, killall on %s having pid %s' % (name, process.pid)) + self.log('ProcesssManager, killall on %s having pid %s', + name, process.pid) to_kill_list.append(process.pid) for pid in to_kill_list: killCommand(pid, self.log) def killPreviousRun(self, cancellation=False): - self.log('ProcessManager killPreviousRun, going to kill %r' % (self.process_pid_set,)) + self.log('ProcessManager killPreviousRun, going to kill %r', + self.process_pid_set) if cancellation: self.under_cancellation = True for timer in self.timer_set: @@ -227,12 +229,13 @@ class ProcessManager(object): killCommand(pgpid, self.log) try: if os.path.exists(self.supervisord_pid_file): - supervisor_pid = int(open(self.supervisord_pid_file).read().strip()) - self.log('ProcessManager killPreviousRun, going to kill supervisor with pid %r' % supervisor_pid) + with open(self.supervisord_pid_file) as f: + supervisor_pid = int(f.read().strip()) + self.log('ProcessManager killPreviousRun, going to kill supervisor with pid %r', + supervisor_pid) os.kill(supervisor_pid, signal.SIGTERM) - except: + except Exception: self.log('ProcessManager killPreviousRun, exception when killing supervisor') - pass self.process_pid_set.clear() def sigterm_handler(self, signal, frame): diff --git a/erp5/util/testnode/SlapOSControler.py b/erp5/util/testnode/SlapOSControler.py index 58093bd7de0cdaec728681da59eab5331aad5989..b1973fac9d2484fa43dcce0c8318a05f0ed66581 100644 --- a/erp5/util/testnode/SlapOSControler.py +++ b/erp5/util/testnode/SlapOSControler.py @@ -30,40 +30,14 @@ import slapos.slap import subprocess import time import xml_marshaller -import shutil import sys -import glob import argparse -import json from slapos import client +from .Utils import createFolder -MAX_PARTIONS = 10 +MAX_PARTITIONS = 10 MAX_SR_RETRIES = 3 -def createFolder(folder, clean=False): - if clean and os.path.exists(folder): - shutil.rmtree(folder) - if not(os.path.exists(folder)): - os.mkdir(folder) - -def createFolders(folder): - if not(os.path.exists(folder)): - os.makedirs(folder) - -def isDir(folder): - return os.path.isdir(folder) - -def createFile(path, mode, content): - f = open(path, mode) - if os.path.exists(path): - f.write(content) - f.close() - else: - # error - pass - - - class SlapOSControler(object): def __init__(self, working_directory, config, log): @@ -91,9 +65,12 @@ class SlapOSControler(object): slapos_url, slapos_account_certificate_path, slapos_account_key_path) - createFile(slapos_account_key_path, "w", key) - createFile(slapos_account_certificate_path, "w", certificate) - createFile(configuration_file_path, "w", configuration_file_value) + with open(slapos_account_key_path, "w") as f: + f.write(key) + with open(slapos_account_certificate_path, "w") as f: + f.write(certificate) + with open(configuration_file_path, "w") as f: + f.write(configuration_file_value) self.configuration_file_path = configuration_file_path return slapos_account_key_path, slapos_account_certificate_path, configuration_file_path @@ -115,26 +92,17 @@ class SlapOSControler(object): try: local = client.init(config) local['supply'](software_url, computer_guid=computer_id, state=state) - self.log('SlapOSControler : supply %s %s %s' %(software_url, computer_id, state)) - except: - self.log("SlapOSControler.supply, \ - exception in registerOpenOrder", exc_info=sys.exc_info()) + self.log('SlapOSControler : supply %s %s %s', software_url, computer_id, state) + except Exception: + self.log("SlapOSControler.supply", exc_info=sys.exc_info()) raise ValueError("Unable to supply (or remove)") else: raise ValueError("Configuration file not found.") - def destroy(self, software_url, computer_id): - """ - Request Deletetion of a software release on a specific node - Ex : - my_controler.destroy('kvm.cfg', 'COMP-726') - """ - self.supply(self, software_url, computer_id, state="destroyed") - def getInstanceRequestedState(self, reference): try: return self.instance_config[reference]['requested_state'] - except: + except Exception: raise ValueError("Instance '%s' not exist" %self.instance_config[reference]) def request(self, reference, software_url, software_type=None, @@ -182,11 +150,11 @@ class SlapOSControler(object): self.instance_config[reference]['partition'] = partition if state == 'destroyed': del self.instance_config[reference] - if state == 'started': - self.log('Instance started with configuration: %s' %str(software_configuration)) - except: - self.log("SlapOSControler.request, \ - exception in registerOpenOrder", exc_info=sys.exc_info()) + elif state == 'started': + self.log('Instance started with configuration: %s', + software_configuration) + except Exception: + self.log("SlapOSControler.request", exc_info=sys.exc_info()) raise ValueError("Unable to do this request") else: raise ValueError("Configuration file not found.") @@ -204,31 +172,30 @@ class SlapOSControler(object): self.log('SlapOSControler : delete instance') try: self._requestSpecificState(reference, 'destroyed') - except: - raise ValueError("Can't delete instance '%s' (instance may not been created?)" %reference) + except Exception: + raise ValueError("Can't delete instance %r (instance not created?)" % reference) def stopInstance(self, reference): self.log('SlapOSControler : stop instance') try: self._requestSpecificState(reference, 'stopped') - except: - raise ValueError("Can't stop instance '%s' (instance may not been created?)" %reference) + except Exception: + raise ValueError("Can't stop instance %r (instance not created?)" % reference) def startInstance(self, reference): self.log('SlapOSControler : start instance') try: self._requestSpecificState(reference, 'started') - except: - raise ValueError("Can't start instance '%s' (instance may not been created?)" %reference) + except Exception: + raise ValueError("Can't start instance %r (instance not created?)" % reference) def updateInstanceXML(self, reference, software_configuration): """ Update the XML configuration of an instance # Request same instance with different parameters. """ - self.log('SlapOSControler : updateInstanceXML') self.log('SlapOSControler : updateInstanceXML will request same' - 'instance with new XML configuration...') + ' instance with new XML configuration...') try: self.request(reference, @@ -238,32 +205,28 @@ class SlapOSControler(object): self.instance_config[reference]['computer_guid'], state='started' ) - except: + except Exception: raise ValueError("Can't update instance '%s' (may not exist?)" %reference) def _resetSoftware(self): - self.log('SlapOSControler : GOING TO RESET ALL SOFTWARE : %r' % - (self.software_root,)) - if os.path.exists(self.software_root): - shutil.rmtree(self.software_root) - os.mkdir(self.software_root) - os.chmod(self.software_root, 0750) + self.log('SlapOSControler : GOING TO RESET ALL SOFTWARE : %r', + self.software_root) + createFolder(self.software_root, True) def initializeSlapOSControler(self, slapproxy_log=None, process_manager=None, reset_software=False, software_path_list=None): self.process_manager = process_manager self.software_path_list = software_path_list - self.log('SlapOSControler, initialize, reset_software: %r' % reset_software) + self.log('SlapOSControler, initialize, reset_software: %r', reset_software) config = self.config slapos_config_dict = self.config.copy() slapos_config_dict.update(software_root=self.software_root, instance_root=self.instance_root, proxy_database=self.proxy_database) - open(self.slapos_config, 'w').write(pkg_resources.resource_string( + with open(self.slapos_config, 'w') as f: + f.write(pkg_resources.resource_string( 'erp5.util.testnode', 'template/slapos.cfg.in') % slapos_config_dict) - createFolder(self.software_root) - createFolder(self.instance_root) # By erasing everything, we make sure that we are able to "update" # existing profiles. This is quite dirty way to do updates... if os.path.exists(self.proxy_database): @@ -282,32 +245,29 @@ class SlapOSControler(object): # connections time.sleep(20) try: - slap = slapos.slap.slap() - self.slap = slap - self.slap.initializeConnection(config['master_url']) + slap = self.slap = slapos.slap.slap() + slap.initializeConnection(config['master_url']) # register software profile for path in self.software_path_list: slap.registerSupply().supply( path, computer_guid=config['computer_id']) computer = slap.registerComputer(config['computer_id']) - except: - self.log("SlapOSControler.initializeSlapOSControler, \ - exception in registerSupply", exc_info=sys.exc_info()) + except Exception: + self.log("SlapOSControler.initializeSlapOSControler", + exc_info=sys.exc_info()) raise ValueError("Unable to registerSupply") # Reset all previously generated software if needed if reset_software: self._resetSoftware() + else: + createFolder(self.software_root) instance_root = self.instance_root - if os.path.exists(instance_root): - # delete old paritions which may exists in order to not get its data - # (ex. MySQL db content) from previous testnode's runs - # In order to be able to change partition naming scheme, do this at - # instance_root level (such change happened already, causing problems). - shutil.rmtree(instance_root) - if not(os.path.exists(instance_root)): - os.mkdir(instance_root) - for i in range(0, MAX_PARTIONS): + # Delete any existing partition in order to not get its data (ex. + # MySQL DB content) from previous runs. To support changes of partition + # naming scheme (which already happened), do this at instance_root level. + createFolder(instance_root, True) + for i in xrange(MAX_PARTITIONS): # create partition and configure computer # XXX: at the moment all partitions do share same virtual interface address # this is not a problem as usually all services are on different ports @@ -336,17 +296,17 @@ class SlapOSControler(object): def runSoftwareRelease(self, config, environment, **kw): self.log("SlapOSControler.runSoftwareRelease") - cpu_count = os.sysconf("SC_NPROCESSORS_ONLN") - os.putenv('MAKEFLAGS', '-j%s' % cpu_count) - os.putenv('NPY_NUM_BUILD_JOBS', '%s' % cpu_count) - os.putenv('BUNDLE_JOBS', '%s' % cpu_count) + cpu_count = str(os.sysconf("SC_NPROCESSORS_ONLN")) + os.environ['MAKEFLAGS'] = '-j' + cpu_count + os.environ['NPY_NUM_BUILD_JOBS'] = cpu_count + os.environ['BUNDLE_JOBS'] = cpu_count os.environ['PATH'] = environment['PATH'] # a SR may fail for number of reasons (incl. network failures) # so be tolerant and run it a few times before giving up - for runs in range(0, MAX_SR_RETRIES): + for _ in xrange(MAX_SR_RETRIES): status_dict = self.spawn(config['slapos_binary'], 'node', 'software', '--all', - '--pidfile', '%s/software.pid' % self.software_root, + '--pidfile', os.path.join(self.software_root, 'slapos-node.pid'), '--cfg', self.slapos_config, raise_error_if_fail=False, log_prefix='slapgrid_sr', get_output=False) if status_dict['status_code'] == 0: @@ -355,32 +315,32 @@ class SlapOSControler(object): def runComputerPartition(self, config, environment, stdout=None, stderr=None, cluster_configuration=None, **kw): - self.log("SlapOSControler.runComputerPartition with cluster_config: %r" % (cluster_configuration,)) + self.log("SlapOSControler.runComputerPartition with cluster_config: %r", + cluster_configuration) for path in self.software_path_list: try: self.slap.registerOpenOrder().request(path, partition_reference='testing partition %s' % \ self.software_path_list.index(path), partition_parameter_kw=cluster_configuration) - except: - self.log("SlapOSControler.runComputerPartition, \ - exception in registerOpenOrder", exc_info=sys.exc_info()) + except Exception: + self.log("SlapOSControler.runComputerPartition", exc_info=sys.exc_info()) raise ValueError("Unable to registerOpenOrder") # try to run for all partitions as one partition may in theory request another one # this not always is required but curently no way to know how "tree" of partitions # may "expand" - sleep_time = 0 - for runs in range(0, MAX_PARTIONS): + for _ in xrange(MAX_PARTITIONS): status_dict = self.spawn(config['slapos_binary'], 'node', 'instance', - '--pidfile', '%s/instance.pid' % self.software_root, + '--pidfile', os.path.join(self.instance_root, 'slapos-node.pid'), '--cfg', self.slapos_config, raise_error_if_fail=False, log_prefix='slapgrid_cp', get_output=False) - self.log('slapgrid_cp status_dict : %r' % (status_dict,)) - if status_dict['status_code'] in (0,): + self.log('slapgrid_cp status_dict : %r', status_dict) + if not status_dict['status_code']: break - # some hack to handle promise issues (should be only one of the two - # codes, but depending on slapos versions, we have inconsistent status - if status_dict['status_code'] in (1,2): - status_dict['status_code'] = 0 + else: + # some hack to handle promise issues (should be only one of the two + # codes, but depending on slapos versions, we have inconsistent status + if status_dict['status_code'] in (1,2): + status_dict['status_code'] = 0 return status_dict diff --git a/erp5/util/testnode/SlapOSMasterCommunicator.py b/erp5/util/testnode/SlapOSMasterCommunicator.py index edd08d6fa091a97df20df87b9da6ab817eacf590..2c37804c544c55ce569ac0570fa6ae54f71ba08c 100644 --- a/erp5/util/testnode/SlapOSMasterCommunicator.py +++ b/erp5/util/testnode/SlapOSMasterCommunicator.py @@ -1,17 +1,15 @@ import datetime import json -import sys import traceback import time #import feedparser +from functools import wraps from uritemplate import expand import slapos.slap from slapos.slap import SoftwareProductCollection - -from slapos.slap.slap import ConnectionError from requests.exceptions import HTTPError -from erp5.util.taskdistribution import SAFE_RPC_EXCEPTION_LIST +from ..taskdistribution import SAFE_RPC_EXCEPTION_LIST # max time to instance changing state: 2 hour MAX_INSTANCE_TIME = 60*60*2 @@ -41,36 +39,29 @@ TESTER_STATE_INSTANCE_UNINSTALLED = "TESTER_STATE_INSTANCE_UNINSTALLED" # Simple decorator to prevent raise due small # network failures. -def retryOnNetworkFailure(func): - def wrapper(*args, **kwargs): +def retryOnNetworkFailure(func, + _except_list = SAFE_RPC_EXCEPTION_LIST + ( + HTTPError, slapos.slap.ConnectionError), + ): + def wrapper(*args, **kw): retry_time = 64 while True: try: - return func(*args, **kwargs) - except SAFE_RPC_EXCEPTION_LIST, e: - print 'Network failure: %s , %s' % (sys.exc_info(), e) - except HTTPError, e: - print 'Network failure: %s , %s' % (sys.exc_info(), e) - except ConnectionError, e: - print 'Network failure: %s , %s' % (sys.exc_info(), e) - except slapos.slap.ConnectionError, e: - print 'Network failure: %s , %s' % (sys.exc_info(), e) - - print 'Retry method %s in %i seconds' % (func, retry_time) + return func(*args, **kw) + except _except_list: + traceback.print_exc() + + print 'Network failure. Retry method %s in %i seconds' % (func, retry_time) time.sleep(retry_time) retry_time = min(retry_time*1.5, 640) - - wrapper.__name__ = func.__name__ - wrapper.__doc__ = func.__doc__ - return wrapper + return wraps(func)(wrapper) class SlapOSMasterCommunicator(object): latest_state = None def __init__(self, slap, slap_supply, slap_order, url, logger): - self._logger = logger self.slap = slap self.slap_order = slap_order @@ -102,8 +93,7 @@ class SlapOSMasterCommunicator(object): if instance_title is not None: self.name = instance_title if request_kw is not None: - if isinstance(request_kw, str) or \ - isinstance(request_kw, unicode): + if isinstance(request_kw, basestring): self.request_kw = json.loads(request_kw) else: self.request_kw = request_kw @@ -116,12 +106,11 @@ class SlapOSMasterCommunicator(object): **self.request_kw) def isInstanceRequested(self, instance_title): - hateoas = getattr(self.slap, '_hateoas_navigator', None) + hateoas = self._hateoas_navigator return instance_title in hateoas.getHostingSubscriptionDict() @retryOnNetworkFailure def _hateoas_getComputer(self, reference): - root_document = self.hateoas_navigator.getRootDocument() search_url = root_document["_links"]['raw_search']['href'] @@ -147,7 +136,6 @@ class SlapOSMasterCommunicator(object): @retryOnNetworkFailure def getSoftwareInstallationList(self): # XXX Move me to slap.py API - computer = self._hateoas_getComputer(self.computer_guid) # Not a list ? @@ -191,7 +179,6 @@ class SlapOSMasterCommunicator(object): @retryOnNetworkFailure def getInstanceUrlList(self): - if self.hosting_subscription_url is None: hosting_subscription_dict = self.hateoas_navigator._hateoas_getHostingSubscriptionDict() for hs in hosting_subscription_dict: @@ -207,7 +194,6 @@ class SlapOSMasterCommunicator(object): @retryOnNetworkFailure def getNewsFromInstance(self, url): - result = self.hateoas_navigator.GET(url) result = json.loads(result) if result['_links'].get('action_object_slap', None) is None: @@ -221,7 +207,6 @@ class SlapOSMasterCommunicator(object): @retryOnNetworkFailure def getInformationFromInstance(self, url): - result = self.hateoas_navigator.GET(url) result = json.loads(result) if result['_links'].get('action_object_slap', None) is None: @@ -329,7 +314,7 @@ class SlapOSMasterCommunicator(object): self._logger('Got an error requesting partition for ' 'its state') return INSTANCE_STATE_UNKNOWN - except: + except Exception: self._logger("ERROR getting instance state") return INSTANCE_STATE_UNKNOWN @@ -377,6 +362,7 @@ class SlapOSMasterCommunicator(object): return {'error_message' : None} class SlapOSTester(SlapOSMasterCommunicator): + def __init__(self, name, logger, @@ -387,7 +373,6 @@ class SlapOSTester(SlapOSMasterCommunicator): computer_guid=None, # computer for supply if desired request_kw=None ): - super(SlapOSTester, self).__init__( slap, slap_supply, slap_order, url, logger) @@ -464,7 +449,6 @@ class SoftwareReleaseTester(SlapOSTester): software_timeout=3600, instance_timeout=3600, ): - super(SoftwareReleaseTester, self).__init__( name, logger, slap, slap_order, slap_supply, url, computer_guid, request_kw) diff --git a/erp5/util/testnode/UnitTestRunner.py b/erp5/util/testnode/UnitTestRunner.py index c1a3fff63ecd7f669d293af49d66f010c4fc1a20..293e99e4b62471d324ac56e034f052d6b5377e2f 100644 --- a/erp5/util/testnode/UnitTestRunner.py +++ b/erp5/util/testnode/UnitTestRunner.py @@ -24,28 +24,22 @@ # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. # ############################################################################## -from datetime import datetime,timedelta import os -import subprocess -import sys -import time import glob -import SlapOSControler import json -import time -import shutil -import logging -import string -import random -from ProcessManager import SubprocessError, ProcessManager, CancellationError -from subprocess import CalledProcessError -from NodeTestSuite import SlapOSInstance -from Updater import Updater -from Utils import dealShebang -from erp5.util import taskdistribution +from .ProcessManager import SubprocessError +from .SlapOSControler import SlapOSControler +from .Utils import createFolder from slapos.grid.utils import md5digest -class UnitTestRunner(): +def dealShebang(run_test_suite_path): + with open(run_test_suite_path) as f: + if f.read(2) == '#!': + return f.readline().split(None, 1) + return [] + +class UnitTestRunner(object): + def __init__(self, testnode): self.testnode = testnode @@ -53,12 +47,11 @@ class UnitTestRunner(): """ Create a SlapOSControler """ - return SlapOSControler.SlapOSControler( + return SlapOSControler( working_directory, self.testnode.config, self.testnode.log) - def _prepareSlapOS(self, working_directory, slapos_instance, log, create_partition=1, software_path_list=None, **kw): """ @@ -66,11 +59,11 @@ class UnitTestRunner(): """ slapproxy_log = os.path.join(self.testnode.config['log_directory'], 'slapproxy.log') - log('Configured slapproxy log to %r' % slapproxy_log) + log('Configured slapproxy log to %r', slapproxy_log) reset_software = slapos_instance.retry_software_count > 10 if reset_software: slapos_instance.retry_software_count = 0 - log('testnode, retry_software_count : %r' % \ + log('testnode, retry_software_count : %r', slapos_instance.retry_software_count) # XXX Create a new controler because working_directory can be @@ -108,26 +101,22 @@ class UnitTestRunner(): """ # report-url, report-project and suite-url are required to seleniumrunner # instance. This is a hack which must be removed. - cluster_configuration = {} config = self.testnode.config - cluster_configuration['report-url'] = config.get("report-url", "") - cluster_configuration['report-project'] = config.get("report-project", "") - cluster_configuration['suite-url'] = config.get("suite-url", "") - return self._prepareSlapOS(self.testnode.config['slapos_directory'], + return self._prepareSlapOS(config['slapos_directory'], test_node_slapos, self.testnode.log, create_partition=0, - software_path_list=self.testnode.config.get("software_list"), - cluster_configuration=cluster_configuration - ) + software_path_list=config.get("software_list"), + cluster_configuration={ + 'report-url': config.get("report-url", ""), + 'report-project': config.get("report-project", ""), + 'suite-url': config.get("suite-url", ""), + }) def prepareSlapOSForTestSuite(self, node_test_suite): """ Build softwares needed by testsuites """ - log = self.testnode.log - if log is None: - log = self.testnode.log return self._prepareSlapOS(node_test_suite.working_directory, - node_test_suite, log, + node_test_suite, self.testnode.log, software_path_list=[node_test_suite.custom_profile_path], cluster_configuration={'_': json.dumps(node_test_suite.cluster_configuration)}) @@ -171,8 +160,7 @@ class UnitTestRunner(): # From this point, test runner becomes responsible for updating test # result. We only do cleanup if the test runner itself is not able # to run. - SlapOSControler.createFolder(node_test_suite.test_suite_directory, - clean=True) + createFolder(node_test_suite.test_suite_directory, clean=True) self.testnode.process_manager.spawn(*invocation_list, cwd=node_test_suite.test_suite_directory, log_prefix='runTestSuite', get_output=False) diff --git a/erp5/util/testnode/Updater.py b/erp5/util/testnode/Updater.py index d7b62a61757126084ec27667806a9ccfec25744a..ece92e465ec7748a46b10067597118c0389aa484 100644 --- a/erp5/util/testnode/Updater.py +++ b/erp5/util/testnode/Updater.py @@ -28,11 +28,8 @@ import errno import os import re import shutil -import subprocess import sys -import threading - -from ProcessManager import SubprocessError +from .ProcessManager import SubprocessError SVN_UP_REV = re.compile(r'^(?:At|Updated to) revision (\d+).$') SVN_CHANGED_REV = re.compile(r'^Last Changed Rev.*:\s*(\d+)', re.MULTILINE) @@ -44,9 +41,8 @@ SVN_TYPE = 'svn' class Updater(object): _git_cache = {} - stdin = file(os.devnull) - def __init__(self, repository_path, log, revision=None, git_binary=None, + def __init__(self, repository_path, log, revision=None, git_binary='git', branch=None, realtime_output=True, process_manager=None, url=None, working_directory=None): self.log = log @@ -81,7 +77,7 @@ class Updater(object): def deletePycFiles(self, path): """Delete *.pyc files so that deleted/moved files can not be imported""" - for path, dir_list, file_list in os.walk(path): + for path, _, file_list in os.walk(path): for file in file_list: if file[-4:] in ('.pyc', '.pyo'): # allow several processes clean the same folder at the same time @@ -115,17 +111,17 @@ class Updater(object): git_repository_path = os.path.join(self.getRepositoryPath(), '.git') name = os.path.basename(os.path.normpath(self.getRepositoryPath())) git_repository_link_path = os.path.join(self.getRepositoryPath(), '%s.git' %name) - self.log("checking link %s -> %s.." - %(git_repository_link_path,git_repository_path)) + self.log("checking link %s -> %s..", + git_repository_link_path, git_repository_path) if ( not os.path.lexists(git_repository_link_path) and \ not os.path.exists(git_repository_link_path) ): try: os.symlink(git_repository_path, git_repository_link_path) - self.log("link: %s -> %s created" - %(git_repository_link_path,git_repository_path)) - except: - self.log("Cannot create link from %s -> %s" - %(git_repository_link_path,git_repository_path)) + self.log("link: %s -> %s created", + git_repository_link_path, git_repository_path) + except OSError: + self.log("Cannot create link from %s -> %s", + git_repository_link_path, git_repository_path) def _git_find_rev(self, ref): try: @@ -152,7 +148,7 @@ class Updater(object): raise NotImplementedError def deleteRepository(self): - self.log("Wrong repository or wrong url, deleting repos %s" % \ + self.log("Wrong repository or wrong url, deleting repos %s", self.repository_path) shutil.rmtree(self.repository_path) @@ -165,7 +161,7 @@ class Updater(object): remote_url = self._git("config", "--get", "remote.origin.url") if remote_url == self.url: correct_url = True - except (SubprocessError,) as e: + except SubprocessError: self.log("SubprocessError", exc_info=sys.exc_info()) if not(correct_url): self.deleteRepository() diff --git a/erp5/util/testnode/Utils.py b/erp5/util/testnode/Utils.py index 01d30d1c4a97931d155ae8fe3cf896ad846aefcb..bc3fbcf4ba0ea72d2013ce3603d4d43843ea30e0 100644 --- a/erp5/util/testnode/Utils.py +++ b/erp5/util/testnode/Utils.py @@ -1,29 +1,19 @@ -import sys -import json +import os import shutil -import string -from random import choice + +def createFolder(folder, clean=False): + if os.path.exists(folder): + if not clean: + return + shutil.rmtree(folder) + os.mkdir(folder) def deunicodeData(data): if isinstance(data, list): - new_data = [] - for sub_data in data: - new_data.append(deunicodeData(sub_data)) - elif isinstance(data, unicode): - new_data = data.encode('utf8') - elif isinstance(data, dict): - new_data = {} - for key, value in data.iteritems(): - key = deunicodeData(key) - value = deunicodeData(value) - new_data[key] = value - else: - new_data = data - return new_data - -def dealShebang(run_test_suite_path): - line = open(run_test_suite_path, 'r').readline() - invocation_list = [] - if line[:2] == '#!': - invocation_list = line[2:].split() - return invocation_list + return map(deunicodeData, data) + if isinstance(data, unicode): + return data.encode('utf8') + if isinstance(data, dict): + return {deunicodeData(key): deunicodeData(value) + for key, value in data.iteritems()} + return data diff --git a/erp5/util/testnode/testnode.py b/erp5/util/testnode/testnode.py index e3d17b41f695b82282a9108f1413e4dbf6bd9cef..6274b9a8ae251fcc2b0954318f3c3d392160618c 100644 --- a/erp5/util/testnode/testnode.py +++ b/erp5/util/testnode/testnode.py @@ -26,30 +26,24 @@ ############################################################################## import os import sys -import time import json import time import shutil import logging -import Utils from slapos.slap.slap import ConnectionError -import traceback - -from ProcessManager import SubprocessError, ProcessManager, CancellationError +from .ProcessManager import SubprocessError, ProcessManager, CancellationError from subprocess import CalledProcessError -from Updater import Updater -from NodeTestSuite import NodeTestSuite, SlapOSInstance -from ScalabilityTestRunner import ScalabilityTestRunner -from UnitTestRunner import UnitTestRunner -from erp5.util import taskdistribution +from .Updater import Updater +from .NodeTestSuite import NodeTestSuite, SlapOSInstance +from .ScalabilityTestRunner import ScalabilityTestRunner +from .UnitTestRunner import UnitTestRunner +from .Utils import deunicodeData +from .. import taskdistribution - -DEFAULT_SLEEP_TIMEOUT = 120 # time in seconds to sleep MAX_LOG_TIME = 15 # time in days we should keep logs that we can see through # httd MAX_TEMP_TIME = 0.01 # time in days we should keep temp files -supervisord_pid_file = None PROFILE_PATH_KEY = 'profile_path' @@ -76,7 +70,6 @@ class TestNode(object): def checkOldTestSuite(self,test_suite_data): - config = self.config installed_reference_set = set(os.listdir(self.working_directory)) wished_reference_set = set([x['test_suite_reference'] for x in test_suite_data]) to_remove_reference_set = installed_reference_set.difference( @@ -84,11 +77,11 @@ class TestNode(object): for y in to_remove_reference_set: fpath = os.path.join(self.working_directory,y) self.delNodeTestSuite(y) - self.log("testnode.checkOldTestSuite, DELETING : %r" % (fpath,)) + self.log("testnode.checkOldTestSuite, DELETING : %r", fpath) if os.path.isdir(fpath): - shutil.rmtree(fpath) + shutil.rmtree(fpath) else: - os.remove(fpath) + os.remove(fpath) def getNodeTestSuite(self, reference): node_test_suite = self.node_test_suite_dict.get(reference) @@ -103,27 +96,22 @@ class TestNode(object): return node_test_suite def delNodeTestSuite(self, reference): - if self.node_test_suite_dict.has_key(reference): - self.node_test_suite_dict.pop(reference) + self.node_test_suite_dict.pop(reference, None) def constructProfile(self, node_test_suite, test_type, use_relative_path=False): - config = self.config - profile_content = '' assert len(node_test_suite.vcs_repository_list), "we must have at least one repository" - profile_path_count = 0 + software_config_path = None profile_content_list = [] revision_dict = dict(node_test_suite.revision_list) for vcs_repository in node_test_suite.vcs_repository_list: - url = vcs_repository['url'] - buildout_section_id = vcs_repository.get('buildout_section_id', None) + buildout_section_id = vcs_repository.get('buildout_section_id') repository_path = vcs_repository['repository_path'] try: profile_path = vcs_repository[PROFILE_PATH_KEY] except KeyError: pass else: - profile_path_count += 1 - if profile_path_count > 1: + if software_config_path is not None: raise ValueError(PROFILE_PATH_KEY + ' defined more than once') # Absolute path to relative path @@ -133,11 +121,6 @@ class TestNode(object): node_test_suite.reference) software_config_path = os.path.relpath(software_config_path, from_path) - profile_content_list.append(""" -[buildout] -extends = %(software_config_path)s -""" % {'software_config_path': software_config_path}) - # Construct sections if not(buildout_section_id is None): # Absolute path to relative @@ -168,14 +151,14 @@ shared = true """ % {'buildout_section_id': buildout_section_id, 'repository_path' : repository_path, 'branch' : vcs_repository.get('branch','master')}) - if not profile_path_count: + if software_config_path is None: raise ValueError(PROFILE_PATH_KEY + ' not defined') + profile_content_list.sort() # Write file - custom_profile = open(node_test_suite.custom_profile_path, 'w') - # sort to have buildout section first - profile_content_list.sort(key=lambda x: [x, ''][x.startswith('\n[buildout]')]) - custom_profile.write(''.join(profile_content_list)) - custom_profile.close() + with open(node_test_suite.custom_profile_path, 'w') as f: + f.write("[buildout]\nextends = %s\n%s" % ( + software_config_path, + ''.join(profile_content_list))) sys.path.append(repository_path) def updateRevisionList(self, node_test_suite): @@ -194,7 +177,7 @@ shared = true url=vcs_repository["url"]) updater.checkout() revision_list.append((repository_id, updater.getRevision())) - except SubprocessError, e: + except SubprocessError: log("Error while getting repository, ignoring this test suite", exc_info=1) return False @@ -211,7 +194,7 @@ shared = true # TODO make the path into url test_result.reportStatus('LOG url', "%s/%s" % (self.config.get('httpd_url'), folder_id), '') - self.log("going to switch to log %r" % suite_log_path) + self.log("going to switch to log %r", suite_log_path) self.process_manager.log = self.log = self.getSuiteLog() return suite_log_path @@ -230,7 +213,7 @@ shared = true self.file_handler = logging.FileHandler(filename=suite_log_path) self.file_handler.setFormatter(formatter) logger.addHandler(self.file_handler) - logger.info('Activated logfile %r output' % suite_log_path) + logger.info('Activated logfile %r output', suite_log_path) self.suite_log = logger.info def checkRevision(self, test_result, node_test_suite): @@ -257,14 +240,13 @@ shared = true node_test_suite.revision_list = revision_list def _cleanupLog(self): - config = self.config log_directory = self.config['log_directory'] - now = time.time() + prune_time = time.time() - 86400 * self.max_log_time for log_folder in os.listdir(log_directory): folder_path = os.path.join(log_directory, log_folder) if os.path.isdir(folder_path): - if (now - os.stat(folder_path).st_mtime)/86400 > self.max_log_time: - self.log("deleting log directory %r" % (folder_path,)) + if os.stat(folder_path).st_mtime < prune_time: + self.log("deleting log directory %r", folder_path) shutil.rmtree(folder_path) def _cleanupTemporaryFiles(self): @@ -273,17 +255,16 @@ shared = true missing disk space, remove old logs """ temp_directory = self.config["system_temp_folder"] - now = time.time() user_id = os.geteuid() + prune_time = time.time() - 86400 * self.max_temp_time for temp_folder in os.listdir(temp_directory): folder_path = os.path.join(temp_directory, temp_folder) if (temp_folder.startswith("tmp") or temp_folder.startswith("buildout")): try: stat = os.stat(folder_path) - if stat.st_uid == user_id and \ - (now - stat.st_mtime)/86400 > self.max_temp_time: - self.log("deleting temp directory %r" % (folder_path,)) + if stat.st_uid == user_id and stat.st_mtime < prune_time: + self.log("deleting temp directory %r", folder_path) if os.path.isdir(folder_path): shutil.rmtree(folder_path) else: @@ -291,9 +272,8 @@ shared = true except OSError: self.log("_cleanupTemporaryFiles exception", exc_info=1) - def cleanUp(self,test_result): - log = self.log - log('Testnode.cleanUp') + def cleanUp(self): + self.log('Testnode.cleanUp') self.process_manager.killPreviousRun() self._cleanupLog() self._cleanupTemporaryFiles() @@ -301,19 +281,15 @@ shared = true def run(self): log = self.log config = self.config - slapgrid = None - previous_revision_dict = {} - revision_dict = {} - test_result = None test_node_slapos = SlapOSInstance() - test_node_slapos.edit(working_directory=self.config['slapos_directory']) + test_node_slapos.edit(working_directory=config['slapos_directory']) try: while True: + test_result = None try: node_test_suite = None self.log = self.process_manager.log = self.testnode_log - self.cleanUp(None) - remote_test_result_needs_cleanup = False + self.cleanUp() begin = time.time() portal_url = config['test_suite_master_url'] self.taskdistribution = taskdistribution.TaskDistributor( @@ -321,23 +297,23 @@ shared = true logger=DummyLogger(log)) node_configuration = self.taskdistribution.subscribeNode(node_title=config['test_node_title'], computer_guid=config['computer_id']) - if type(node_configuration) == str: + if type(node_configuration) is str: # Backward compatiblity node_configuration = json.loads(node_configuration) if node_configuration is not None and \ 'process_timeout' in node_configuration \ and node_configuration['process_timeout'] is not None: process_timeout = node_configuration['process_timeout'] - log('Received and using process timeout from master: %i' % ( - process_timeout)) + log('Received and using process timeout from master: %i', + process_timeout) self.process_manager.max_timeout = process_timeout test_suite_data = self.taskdistribution.startTestSuite( node_title=config['test_node_title'], computer_guid=config['computer_id']) - if type(test_suite_data) == str: + if type(test_suite_data) is str: # Backward compatiblity test_suite_data = json.loads(test_suite_data) - test_suite_data = Utils.deunicodeData(test_suite_data) + test_suite_data = deunicodeData(test_suite_data) log("Got following test suite data from master : %r", test_suite_data) try: @@ -360,19 +336,18 @@ shared = true # Clean-up test suites self.checkOldTestSuite(test_suite_data) for test_suite in test_suite_data: - remote_test_result_needs_cleanup = False node_test_suite = self.getNodeTestSuite( test_suite["test_suite_reference"]) node_test_suite.edit( - working_directory=self.config['working_directory'], - log_directory=self.config['log_directory']) + working_directory=config['working_directory'], + log_directory=config['log_directory']) node_test_suite.edit(**test_suite) if my_test_type == 'UnitTest': runner = UnitTestRunner(node_test_suite) elif my_test_type == 'ScalabilityTest': - runner = ScalabilityTestRunner(self) + runner = ScalabilityTestRunner(self) else: log("testnode, Runner type %s not implemented.", my_test_type) raise NotImplementedError @@ -380,7 +355,6 @@ shared = true # XXX: temporary hack to prevent empty test_suite if not hasattr(node_test_suite, 'test_suite'): node_test_suite.edit(test_suite='') - run_software = True # kill processes from previous loop if any self.process_manager.killPreviousRun() if not self.updateRevisionList(node_test_suite): @@ -394,7 +368,6 @@ shared = true config['test_node_title'], False, node_test_suite.test_suite_title, node_test_suite.project_title) - remote_test_result_needs_cleanup = True log("testnode, test_result : %r", test_result) if test_result is not None: self.registerSuiteLog(test_result, node_test_suite) @@ -402,10 +375,10 @@ shared = true node_test_suite.edit(test_result=test_result) # get cluster configuration for this test suite, this is needed to # know slapos parameters to user for creating instances - log("Getting configuration from test suite " + str(node_test_suite.test_suite_title)) + log("Getting configuration from test suite %s", node_test_suite.test_suite_title) generated_config = self.taskdistribution.generateConfiguration(node_test_suite.test_suite_title) json_data = json.loads(generated_config) - cluster_configuration = Utils.deunicodeData(json_data['configuration_list'][0]) + cluster_configuration = deunicodeData(json_data['configuration_list'][0]) node_test_suite.edit(cluster_configuration=cluster_configuration) # Now prepare the installation of SlapOS and create instance status_dict = runner.prepareSlapOSForTestSuite(node_test_suite) @@ -437,10 +410,10 @@ shared = true raise NotImplementedError # break the loop to get latest priorities from master break - self.cleanUp(test_result) + self.cleanUp() except (SubprocessError, CalledProcessError, ConnectionError) as e: log("", exc_info=1) - if remote_test_result_needs_cleanup: + if test_result is not None: status_dict = getattr(e, "status_dict", None) or { 'stderr': "%s: %s" % (e.__class__.__name__, e)} test_result.reportFailure( @@ -454,7 +427,7 @@ shared = true log("", exc_info=1) if node_test_suite is not None: node_test_suite.retry_software_count += 1 - if remote_test_result_needs_cleanup: + if test_result is not None: test_result.reportFailure( command='', stdout='', stderr="ValueError was raised : %s" % (e,), @@ -464,10 +437,9 @@ shared = true self.process_manager.under_cancellation = False node_test_suite.retry = True continue - now = time.time() - self.cleanUp(test_result) - if (now-begin) < 120: - sleep_time = 120 - (now-begin) + self.cleanUp() + sleep_time = 120 - (time.time() - begin) + if sleep_time > 0: log("End of processing, going to sleep %s", sleep_time) time.sleep(sleep_time) except Exception: @@ -480,5 +452,5 @@ shared = true # groups working only in POSIX compilant systems # Exceptions are swallowed during cleanup phase log("GENERAL EXCEPTION, QUITING") - self.cleanUp(test_result) + self.cleanUp() log("GENERAL EXCEPTION, QUITING, cleanup finished")