Commit e73fd39f authored by Georgios Dagkakis's avatar Georgios Dagkakis

progress in creating the sub-batches

parent 3de1f971
...@@ -58,6 +58,7 @@ class BatchesWIPShort(plugin.InputPreparationPlugin): ...@@ -58,6 +58,7 @@ class BatchesWIPShort(plugin.InputPreparationPlugin):
# if we have stations that may share sub-batches # if we have stations that may share sub-batches
if len(group)>1: if len(group)>1:
currentBatchId='Batch_'+str(batchCounter)+'_WIP' currentBatchId='Batch_'+str(batchCounter)+'_WIP'
subBatchCounter=0
unitsToCompleteBatch=standardBatchUnits unitsToCompleteBatch=standardBatchUnits
group.sort(key=lambda x: self.getDistanceFromSource(data, x)) group.sort(key=lambda x: self.getDistanceFromSource(data, x))
for stationId in group: for stationId in group:
...@@ -76,13 +77,45 @@ class BatchesWIPShort(plugin.InputPreparationPlugin): ...@@ -76,13 +77,45 @@ class BatchesWIPShort(plugin.InputPreparationPlugin):
proceeded=complete - (complete % workingBatchSize) proceeded=complete - (complete % workingBatchSize)
currentCompleted=awaiting % workingBatchSize currentCompleted=awaiting % workingBatchSize
print buffered,proceeded,currentCompleted print buffered,proceeded,currentCompleted
# set the buffered sub-batches to the previous station
for i in range(int(buffered/workingBatchSize)):
bufferId=self.getPredecessors(data, stationId)[0]
self.createSubBatch(data, bufferId, currentBatchId, currentBatchId, subBatchCounter,
workingBatchSize,receiver=stationId)
subBatchCounter+=1
unitsToCompleteBatch-=workingBatchSize
if unitsToCompleteBatch==0:
subBatchCounter=0
batchCounter+=1
# set the buffered sub-batches to the previous station
for i in range(int(proceeded/workingBatchSize)):
bufferId=self.getSuccessors(data, stationId)[0]
self.createSubBatch(data, bufferId, currentBatchId, currentBatchId, subBatchCounter, workingBatchSize)
subBatchCounter+=1
unitsToCompleteBatch-=workingBatchSize
if unitsToCompleteBatch==0:
subBatchCounter=0
batchCounter+=1
# for stations that do not share sub-batches with others # for stations that do not share sub-batches with others
else: else:
pass pass
return data return data
# creates a sub-batch in a station
def createSubBatch(self,data,stationId,parentBatchId,parentBatchName,subBatchId,numberOfUnits,
unitsToProcess=0,receiver=None):
print 'creating',stationId,parentBatchId
data['graph']['node'][stationId]['wip'].append({
"_class": 'Dream.SubBatch',
"id": parentBatchId+'_SB_'+str(subBatchId)+'_wip',
"name":parentBatchName+'_SB_'+str(subBatchId)+'_wip',
"numberOfUnits":numberOfUnits,
"unitsToProcess": unitsToProcess,
"parentBatchId":parentBatchId,
"parentBatchName":parentBatchName,
"receiver":receiver
})
# gets the data and a station id and returns a list with all the stations that the station may share batches # gets the data and a station id and returns a list with all the stations that the station may share batches
def findSharingStations(self,data,stationId): def findSharingStations(self,data,stationId):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment