Commit b858a010 authored by Jérome Perrin's avatar Jérome Perrin

Example Plugin Architecture for ManPy

parent fc8b96eb
from copy import copy
import json
import time
import random
import operator
import xmlrpclib
from dream.simulation.Queue import Queue
from dream.simulation.Globals import getClassFromName
from . import Plugin
class ACOExecution(Plugin.DefaultExecutionPlugin):
def _calculateAntScore(self, ant):
"""Calculate the score of this ant.
totalDelay=0 #set the total delay to 0
jsonData=ant['result'] #read the result as JSON
elementList = jsonData['elementList'] #find the route of JSON
#loop through the elements
for element in elementList:
elementClass=element['_class'] #get the class
#id the class is Job
if elementClass=='Dream.Job':
delay = float(results.get('delay', "0"))
# A negative delay would mean we are ahead of schedule. This
# should not be considered better than being on time.
totalDelay += max(delay, 0)
return totalDelay
def run(self, data):
distributor_url = data['general'].get('distributorURL')
distributor = None
if distributor_url:
distributor = xmlrpclib.Server(distributor_url)"Distributed ACO using distributor from %s" % distributor)
tested_ants = set()
start = time.time() # start counting execution time
# the list of options collated into a dictionary for ease of referencing in
# ManPy
collated = dict()
for node_id, node in data['nodes'].items():
node_class = getClassFromName(node['_class'])
if issubclass(node_class, Queue):
collated[node_id] = list(node_class.getSupportedSchedulingRules())
max_results = data['general']['numberOfSolutions']
ants = [] #list of ants for keeping track of their performance
# Number of times new ants are to be created, i.e. number of generations (a
# generation can have more than 1 ant)
for i in range(data["general"]["numberOfGenerations"]):
scenario_list = [] # for the distributor
# number of ants created per generation
for j in range(data["general"]["numberOfAntsPerGenerations"]):
# an ant dictionary to contain rule to queue assignment information
ant = {}
# for each of the machines, rules are randomly picked from the
# options list
for k in collated.keys():
ant[k] = random.choice(collated[k])
# TODO: function to calculate ant id. Store ant id in ant dict
ant_key = repr(ant)
# if the ant was not already tested, only then test it
if ant_key not in tested_ants:
# set scheduling rule on queues based on ant data
ant_data = copy(data)
for k, v in ant.items():
ant_data["nodes"][k]['schedulingRule'] = v
ant['key'] = ant_key
ant['input'] = ant_data
if distributor is None:
# synchronous
for ant in scenario_list:
# TODO: adapt this.
ant['result'] =, ant['input'])
else: # asynchronous
job_id = distributor.requestSimulationRun(
[json.dumps(x) for x in scenario_list])"Job registered " + job_id)
while True:
result_list = distributor.getJobResult(job_id)
# The distributor returns None when calculation is still ongoing,
# or the list of result in the same order.
if result_list is not None:"Job terminated")
for ant, result in zip(scenario_list, result_list):
ant['result'] = json.loads(result)
for ant in scenario_list:
ant['score'] = self._calculateAntScore(ant)
# remove ants that outputs the same schedules
ants_without_duplicates = dict()
for ant in ants:
ant_result = copy(ant['result'])
ant_result['general'].pop('totalExecutionTime', None)
ant_result = json.dumps(ant_result, sort_keys=True)
ants_without_duplicates[ant_result] = ant
# The ants in this generation are ranked based on their scores and the
# best (max_results) are selected
ants = sorted(ants_without_duplicates.values(),
for l in ants:
# update the options list to ensure that good performing queue-rule
# combinations have increased representation and good chance of
# being selected in the next generation
for m in collated.keys():
# e.g. if using EDD gave good performance for Q1, then another
# 'EDD' is added to Q1 so there is a higher chance that it is
# selected by the next ants.
collated[m].append(l[m])"ACO finished, execution time %0.2fs" % (time.time() - start))
return ants
from copy import deepcopy
import json
from zope.dottedname.resolve import resolve
from dream.simulation.LineGenerationJSON import main as simulate_line_json
class Plugin(object):
"""Base class for Knowledge Extraction Plugin.
def __init__(self, logger=None):
self.logger = logger
class ExecutionPlugin(Plugin):
"""Plugin to handle the execution of multiple simulation runs.
def run(self, data):
"""General execution plugin.
raise NotImplementedError
class InputPreparationPlugin(Plugin):
def preprocess(self, data):
"""Preprocess the data before simulation run.
return data
class OutputPreparationPlugin(Plugin):
def postprocess(self, data):
"""Postprocess the data after simulation run.
return data
class DefaultExecutionPlugin(ExecutionPlugin):
"""Default Execution Plugin just executes one scenario.
def run(self, data):
return json.loads(simulate_line_json(input_data=json.dumps(data)))
class PluginRegistry(object):
"""Registry of plugins.
def __init__(self, logger,
self.input_preparation_list = tuple([resolve(name)(logger) for name in
self.output_preparation_list = tuple([resolve(name)(logger) for name in
self.execution_plugin = resolve(execution_plugin_class)(logger)
def run(self, data):
"""Preprocess, execute & postprocess.
for input_preparation in self.input_preparation_list:
data = input_preparation.preprocess(deepcopy(data))
data =
for output_preparation in self.output_preparation_list:
data = output_preparation.postprocess(deepcopy(data))
return data
from copy import copy
import json
import time
import random
import operator
from datetime import datetime
from dream.KnowledgeExtraction.plugins import Plugin
MACHINE_TYPE_SET = set(["Dream.MachineManagedJob", "Dream.MouldAssembly"])
class WIPPartSpreadsheet(Plugin.InputPreparationPlugin):
""" Input prepration to read parts route and wip from spreadsheet data.
def getMachineNameSet(self, step_name):
Give list of machines given a particular step name. For example
if step_name is "CAM", it will return ["CAM1", "CAM2"]
machine_name_set = set()
for machine_name in["nodes"].keys():
if machine_name.startswith(step_name):
return machine_name_set
def getNotMachineNodePredecessorList(self, step_name):
Give the list of all predecessors that are not of type machine
For example, for step_name "CAM", it may return "QCAM"
predecessor_list = []
machine_name_set = self.getMachineNameSet(step_name)
for edge in["edges"].values():
if edge[1] in machine_name_set:
predecessor_step = edge[0]
if predecessor_step in predecessor_list:
if not["nodes"][predecessor_step]["_class"] in MACHINE_TYPE_SET:
predecessor_list = [predecessor_step] + predecessor_list
predecessor_list = [x for x in self.getNotMachineNodePredecessorList(predecessor_step) \
if x not in predecessor_list] + predecessor_list
return predecessor_list
def getRouteList(self, sequence_list, processing_time_list, prerequisite_list):
# use to record which predecessor has been already done, used to avoid doing
# two times Decomposition
predecessor_set = set()
route_list = []
for j, sequence_step in enumerate(sequence_list):
for predecessor_step in self.getNotMachineNodePredecessorList(sequence_step):
# We avoid having two time Decomposition in the route. XXX Is this correct ?
if predecessor_step == "Decomposition" and predecessor_step in predecessor_set:
route = {"stationIdsList": [predecessor_step],
route = {"stationIdsList": list(self.getMachineNameSet(sequence_step)),
"processingTime": {"distributionType": "Fixed",
"mean": float(processing_time_list[j])},
"setupTime": {"distributionType": "Fixed",
"mean": .5}, # XXX hardcoded value
if prerequisite_list:
route["prerequisites"] = prerequisite_list
return route_list
def getListFromString(self, my_string):
my_list = []
if not my_string in (None, ''):
my_list = my_string.split('-')
return my_list
def preprocess(self, data):
""" Set the WIP in queue from spreadsheet data.
""" = data
now =
if data['general']['currentDate']:
now = datetime.strptime(data['general']['currentDate'], '%Y/%m/%d')
if 'wip_part_spreadsheet' in data:
wip_list = []
i = 0
wip_part_spreadsheet_length = len(data['wip_part_spreadsheet'])
while i < wip_part_spreadsheet_length:
value_list = data['wip_part_spreadsheet'][i]
if value_list[0] == 'Order ID' or not value_list[4]:
i += 1
order_dict = {}
order_id, due_date, priority, project_manager, part, part_type,\
sequence_list, processing_time_list, prerequisite_string = value_list
due_date = (datetime.strptime(due_date, '%Y/%m/%d') - now).days * 24
prerequisite_list = self.getListFromString(prerequisite_string)
sequence_list = sequence_list.split('-')
processing_time_list = processing_time_list.split('-')
order_dict["_class"] = "Dream.Order"
order_dict["id"] = "%i" % i # XXX hack, we use it in UI to retrieve spreadsheet line
order_dict["manager"] = project_manager
order_dict["name"] = order_id
order_dict["dueDate"] = due_date
order_dict["priority"] = float(priority)
# XXX make it dynamic by writing a function that will reuse the
# code available a bit after
order_dict["route"] = self.getRouteList(sequence_list, processing_time_list,
i += 1
component_list = []
if i < wip_part_spreadsheet_length:
while data['wip_part_spreadsheet'][i][0] in (None, ''):
value_list = data['wip_part_spreadsheet'][i]
if value_list[4] in (None, ''):
order_id, due_date, priority, project_manager, part, part_type,\
sequence_list, processing_time_list, prerequisite_string = value_list
sequence_list = sequence_list.split('-')
prerequisite_list = self.getListFromString(prerequisite_string)
processing_time_list = processing_time_list.split('-')
component_dict = {}
component_dict["_class"] = "Dream.OrderComponent"
if part_type == "Mould":
component_dict["_class"] = "Dream.Mould"
component_dict["componentType"] = part_type
component_dict["id"] = "%i" % i # XXX hack, we use it in UI to retrieve spreadsheet line
component_dict["name"] = part
route_list = self.getRouteList(sequence_list, processing_time_list,
if part_type == "Mould":
route_list = route_list[1:]
component_dict["route"] = route_list
order_dict["componentsList"] = component_list
data["nodes"]["QStart"]["wip"] = wip_list
return data
# ===========================================================================
# Copyright 2013 University of Limerick
# This file is part of DREAM.
# DREAM is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# DREAM is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# GNU Lesser General Public License for more details.
# You should have received a copy of the GNU Lesser General Public License
# along with DREAM. If not, see <>.
# ===========================================================================
# See
except ImportError:
from pkgutil import extend_path
__path__ = extend_path(__path__, __name__)
\ No newline at end of file
......@@ -376,6 +376,7 @@ class Simulation(object):
def _preprocess(self, data):
"""Preprocess the data, for instance reading spreadsheet.
# TODO: plugin
# by default we add an event generator if using queue stats
if self.getConfigurationDict()["Dream-Configuration"]["gui"]["queue_stat"]:
for node in data["nodes"].values():
......@@ -17,11 +17,13 @@ setup(
'rpy2>=2.3,<2.4', # 2.4.1 does not work for me
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment