Commit e7940670 authored by Alain Takoudjou's avatar Alain Takoudjou

grid.promise: kill timed out promise process if terminate is not enough

parent 6380686c
...@@ -40,7 +40,7 @@ import psutil ...@@ -40,7 +40,7 @@ import psutil
from multiprocessing import Process, Queue as MQueue from multiprocessing import Process, Queue as MQueue
import Queue import Queue
from slapos.util import mkdir_p, chownDirectory from slapos.util import mkdir_p, chownDirectory
from slapos.grid.utils import dropPrivileges from slapos.grid.utils import dropPrivileges, killProcessTree
from slapos.grid.promise import interface from slapos.grid.promise import interface
from slapos.grid.promise.generic import (GenericPromise, PromiseQueueResult, from slapos.grid.promise.generic import (GenericPromise, PromiseQueueResult,
AnomalyResult, TestResult, AnomalyResult, TestResult,
...@@ -314,15 +314,15 @@ class PromiseLauncher(object): ...@@ -314,15 +314,15 @@ class PromiseLauncher(object):
if not os.path.exists(self.promise_output_dir): if not os.path.exists(self.promise_output_dir):
mkdir_p(self.promise_output_dir) mkdir_p(self.promise_output_dir)
def _getErrorPromiseResult(self, promise_process, promise_name, message, def _getErrorPromiseResult(self, promise_process, promise_name, promise_path,
execution_time=0): message, execution_time=0):
if self.check_anomaly: if self.check_anomaly:
result = AnomalyResult(problem=True, message=message) result = AnomalyResult(problem=True, message=message)
else: else:
result = TestResult(problem=True, message=message) result = TestResult(problem=True, message=message)
return PromiseQueueResult( return PromiseQueueResult(
item=result, item=result,
path=os.path.join(self.promise_folder, promise_name), path=promise_path,
name=promise_name, name=promise_name,
title=promise_process.getPromiseTitle(), title=promise_process.getPromiseTitle(),
execution_time=execution_time execution_time=execution_time
...@@ -362,6 +362,14 @@ class PromiseLauncher(object): ...@@ -362,6 +362,14 @@ class PromiseLauncher(object):
)) ))
return result return result
def _emptyQueue(self):
"""Remove all entries from queue until it's empty"""
while True:
try:
self.queue_result.get_nowait()
except Queue.Empty:
return
def _launchPromise(self, promise_name, promise_path, argument_dict, def _launchPromise(self, promise_name, promise_path, argument_dict,
wrap_process=False): wrap_process=False):
""" """
...@@ -394,6 +402,9 @@ class PromiseLauncher(object): ...@@ -394,6 +402,9 @@ class PromiseLauncher(object):
self.logger.error(result.item.message) self.logger.error(result.item.message)
return True return True
return False return False
# we can do this because we run processes one by one
# we cleanup queue in case previous result was written by a killed process
self._emptyQueue()
promise_process.start() promise_process.start()
except Exception: except Exception:
# only print traceback to not prevent run other promises # only print traceback to not prevent run other promises
...@@ -444,11 +455,16 @@ class PromiseLauncher(object): ...@@ -444,11 +455,16 @@ class PromiseLauncher(object):
execution_time = (current_increment + 1) * sleep_time execution_time = (current_increment + 1) * sleep_time
else: else:
promise_process.terminate() promise_process.terminate()
promise_process.join() # wait for process to terminate promise_process.join(1) # wait for process to terminate
# if the process is still alive after 1 seconds, we kill it
if promise_process.is_alive():
self.logger.info("Killing process %s..." % promise_name)
killProcessTree(promise_process.pid, self.logger)
message = 'Promise timed out after %s seconds' % self.promise_timeout message = 'Promise timed out after %s seconds' % self.promise_timeout
queue_item = self._getErrorPromiseResult( queue_item = self._getErrorPromiseResult(
promise_process, promise_process,
promise_name=promise_name, promise_name=promise_name,
promise_path=promise_path,
message=message, message=message,
execution_time=execution_time execution_time=execution_time
) )
...@@ -457,6 +473,7 @@ class PromiseLauncher(object): ...@@ -457,6 +473,7 @@ class PromiseLauncher(object):
queue_item = self._getErrorPromiseResult( queue_item = self._getErrorPromiseResult(
promise_process, promise_process,
promise_name=promise_name, promise_name=promise_name,
promise_path=promise_path,
message="No output returned by the promise", message="No output returned by the promise",
execution_time=execution_time execution_time=execution_time
) )
......
...@@ -37,6 +37,8 @@ import stat ...@@ -37,6 +37,8 @@ import stat
import subprocess import subprocess
import sys import sys
import logging import logging
import psutil
import time
from slapos.grid.exception import BuildoutFailedError, WrongPermissionError from slapos.grid.exception import BuildoutFailedError, WrongPermissionError
...@@ -361,3 +363,33 @@ def createPrivateDirectory(path): ...@@ -361,3 +363,33 @@ def createPrivateDirectory(path):
raise WrongPermissionError('Wrong permissions in %s: ' raise WrongPermissionError('Wrong permissions in %s: '
'is 0%o, should be 0700' 'is 0%o, should be 0700'
% (path, permission)) % (path, permission))
def killProcessTree(pid, logger):
"""
kill all process Tree
We first suspend processes to prevent them from reacting to signals
"""
try:
process = psutil.Process(pid)
process.suspend()
except psutil.Error:
return
process_list = [process]
running_process_list = process.children(recursive=True)
while running_process_list:
process_list += running_process_list
for child in running_process_list:
try:
child.suspend()
except psutil.Error, e:
logger.debug(str(e))
time.sleep(0.2)
running_process_list = set(process.children(recursive=True)).difference(process_list)
for process in process_list:
try:
process.kill()
except psutil.Error, e:
logger.debug("Process kill: %s" % e)
...@@ -69,7 +69,7 @@ class TestSlapOSPromiseMixin(unittest.TestCase): ...@@ -69,7 +69,7 @@ class TestSlapOSPromiseMixin(unittest.TestCase):
if sys.path[0] == self.plugin_dir: if sys.path[0] == self.plugin_dir:
del sys.path[0] del sys.path[0]
def configureLauncher(self, save_method=None, timeout=0.5, master_url="", debug=False, def configureLauncher(self, save_method=None, timeout=1, master_url="", debug=False,
run_list=[], uid=None, gid=None, enable_anomaly=False, force=False, run_list=[], uid=None, gid=None, enable_anomaly=False, force=False,
logdir=True, dry_run=False): logdir=True, dry_run=False):
parameter_dict = { parameter_dict = {
...@@ -927,13 +927,42 @@ exit 0 ...@@ -927,13 +927,42 @@ exit 0
self.configureLauncher(save_method=test_method, enable_anomaly=True, timeout=1) self.configureLauncher(save_method=test_method, enable_anomaly=True, timeout=1)
self.generatePromiseScript(promise_name, success=True, content="""import time self.generatePromiseScript(promise_name, success=True, content="""import time
time.sleep(5)""") time.sleep(20)""")
# run promise will timeout # run promise will timeout
with self.assertRaises(PromiseError): with self.assertRaises(PromiseError):
self.launcher.run() self.launcher.run()
self.assertTrue(self.called) self.assertTrue(self.called)
def test_runpromise_wrapped_will_timeout(self):
promise_name = "my_bash_promise"
promise_path = os.path.join(self.legacy_promise_dir, promise_name)
self.called = False
with open(promise_path, 'w') as f:
f.write("""#!/bin/bash
sleep 20
echo "success"
""")
os.chmod(promise_path, 0744)
def test_method(result):
self.called = True
self.assertTrue(isinstance(result, PromiseQueueResult))
self.assertTrue(isinstance(result.item, TestResult))
self.assertTrue(result.execution_time >= 1)
self.assertEquals(result.title, promise_name)
self.assertEquals(result.name, promise_name)
self.assertEquals(result.path, promise_path)
self.assertTrue("Promise timed out after" in result.item.message)
self.assertEquals(result.item.hasFailed(), True)
self.assertTrue(isinstance(result.item.date, datetime))
self.configureLauncher(save_method=test_method, timeout=1)
# run promise will timeout
with self.assertRaises(PromiseError):
self.launcher.run()
self.assertTrue(self.called)
class TestSlapOSGenericPromise(TestSlapOSPromiseMixin): class TestSlapOSGenericPromise(TestSlapOSPromiseMixin):
def initialisePromise(self, promise_content="", success=True, timeout=60): def initialisePromise(self, promise_content="", success=True, timeout=60):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment