Commit e5eb6c70 authored by Jérome Perrin's avatar Jérome Perrin

ACO: simpler distribution using parrallel subprocesses

parent b8666f1f
...@@ -6,13 +6,19 @@ import time ...@@ -6,13 +6,19 @@ import time
import random import random
import operator import operator
import xmlrpclib import xmlrpclib
import signal
from multiprocessing import Pool
from dream.simulation.Queue import Queue from dream.simulation.Queue import Queue
from dream.simulation.Operator import Operator from dream.simulation.Operator import Operator
from dream.simulation.Globals import getClassFromName from dream.simulation.Globals import getClassFromName
class ACO(plugin.ExecutionPlugin): # run an ant in a subrocess. Can be parrallelized.
def runAntInSubProcess(ant):
ant['result'] = plugin.ExecutionPlugin.runOneScenario(ant['input'])['result']
return ant
class ACO(plugin.ExecutionPlugin):
def _calculateAntScore(self, ant): def _calculateAntScore(self, ant):
"""Calculate the score of this ant. """Calculate the score of this ant.
""" """
...@@ -57,6 +63,8 @@ class ACO(plugin.ExecutionPlugin): ...@@ -57,6 +63,8 @@ class ACO(plugin.ExecutionPlugin):
if distributor_url: if distributor_url:
distributor = xmlrpclib.Server(distributor_url) distributor = xmlrpclib.Server(distributor_url)
multiprocessorCount = data['general'].get('multiprocessorCount')
tested_ants = set() tested_ants = set()
start = time.time() # start counting execution time start = time.time() # start counting execution time
...@@ -97,6 +105,20 @@ class ACO(plugin.ExecutionPlugin): ...@@ -97,6 +105,20 @@ class ACO(plugin.ExecutionPlugin):
scenario_list.append(ant) scenario_list.append(ant)
if distributor is None: if distributor is None:
if multiprocessorCount:
self.logger.info("running multiprocessing ACO with %s processes" % multiprocessorCount)
# We unset our signal handler to print traceback at the end
# otherwise logs are confusing.
sigterm_handler = signal.getsignal(signal.SIGTERM)
pool = Pool(processes=multiprocessorCount)
try:
signal.signal(signal.SIGTERM, signal.SIG_DFL)
scenario_list = pool.map(runAntInSubProcess, scenario_list)
pool.close()
pool.join()
finally:
signal.signal(signal.SIGTERM, sigterm_handler)
else:
# synchronous # synchronous
for ant in scenario_list: for ant in scenario_list:
ant['result'] = self.runOneScenario(ant['input'])['result'] ant['result'] = self.runOneScenario(ant['input'])['result']
......
...@@ -83,6 +83,7 @@ class BatchesACO(ACO): ...@@ -83,6 +83,7 @@ class BatchesACO(ACO):
# else run ACO # else run ACO
data['general']['numberOfSolutions']=1 # default of 1 solution for this instance data['general']['numberOfSolutions']=1 # default of 1 solution for this instance
data["general"]["distributorURL"]=None # no distributor currently, to be added in the GUI data["general"]["distributorURL"]=None # no distributor currently, to be added in the GUI
data["general"]["multiprocessorCount"] = 8 # number of parrallel processes, to be added to the GUI
ACO.run(self, data) ACO.run(self, data)
data["result"]["result_list"][-1]["score"] = '' data["result"]["result_list"][-1]["score"] = ''
data["result"]["result_list"][-1]["key"] = "Go To Results Page" data["result"]["result_list"][-1]["key"] = "Go To Results Page"
......
...@@ -59,7 +59,8 @@ class Plugin(object): ...@@ -59,7 +59,8 @@ class Plugin(object):
class ExecutionPlugin(Plugin): class ExecutionPlugin(Plugin):
"""Plugin to handle the execution of multiple simulation runs. """Plugin to handle the execution of multiple simulation runs.
""" """
def runOneScenario(self, data): @staticmethod
def runOneScenario(data):
"""default method for running one scenario """default method for running one scenario
""" """
return json.loads(simulate_line_json(input_data=json.dumps(data))) return json.loads(simulate_line_json(input_data=json.dumps(data)))
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment