Commit 26a2a077 authored by Michael Kozono's avatar Michael Kozono Committed by Mayra Cabrera

Geo: Simplify job picking algorithm for schedulers

parent 10f6f8c7
......@@ -127,48 +127,29 @@ module Geo
pending_resources.empty?
end
# rubocop: disable CodeReuse/ActiveRecord
# Takes elements from the front of each array one-by-one, until they are
# empty. Order is preserved. A limit can be specified, and it defaults to
# `db_retrieve_batch_size`. The given arrays are mutated.
#
# @return [Array] an array composed of elements from the given arrays
def take_batch(*arrays, batch_size: db_retrieve_batch_size)
interleave(*arrays).uniq.compact.take(batch_size)
end
# rubocop: enable CodeReuse/ActiveRecord
result = []
total = arrays.sum(&:length)
iterations = [total, batch_size].min
# Combines the elements of multiple, arbitrary-length arrays into a single array.
#
# Each array is spread evenly over the resultant array.
# The order of the original arrays is preserved within the resultant array.
# In the case of ties between elements, the element from the first array goes first.
# From https://stackoverflow.com/questions/15628936/ruby-equally-distribute-elements-and-interleave-merge-multiple-arrays/15639147#15639147
#
# For examples, see the specs in file_download_dispatch_worker_spec.rb
def interleave(*arrays)
elements = []
coefficients = []
arrays.each_with_index do |e, index|
elements += e
coefficients += interleave_coefficients(e, index)
iterations.times do
arrays.size.times do
if arrays.first.any?
result << arrays.first.shift
arrays.rotate! # round-robin
break
end
combined = elements.zip(coefficients)
combined.sort_by { |zipped| zipped[1] }.map { |zipped| zipped[0] }
arrays.shift # remove empty array
end
# Assigns a position to each element in order to spread out arrays evenly.
#
# `array_index` is used to resolve ties between arrays of equal length.
#
# Examples:
#
# irb(main):006:0> interleave_coefficients(['a', 'b'], 0)
# => [0.2499998750000625, 0.7499996250001875]
# irb(main):027:0> interleave_coefficients(['a', 'b', 'c'], 0)
# => [0.16666661111112963, 0.4999998333333889, 0.8333330555556481]
# irb(main):007:0> interleave_coefficients(['a', 'b', 'c'], 1)
# => [0.16699994433335189, 0.5003331665556111, 0.8336663887778704]
def interleave_coefficients(array, array_index)
(1..array.size).map do |i|
(i - 0.5 + array_index / 1000.0) / (array.size + 1e-6)
end
result.compact.uniq
end
def update_jobs_in_progress
......
......@@ -21,10 +21,10 @@ RSpec.describe Geo::Scheduler::SchedulerWorker, :geo do
expect(subject).to receive(:db_retrieve_batch_size).and_return(4)
expect(subject.send(:take_batch, a, b, c)).to eq([
[3, :job_artifact],
[2, :lfs],
[8, :job_artifact],
[3, :lfs]
[3, :job_artifact],
[3, :lfs],
[8, :job_artifact]
])
end
end
......@@ -32,46 +32,10 @@ RSpec.describe Geo::Scheduler::SchedulerWorker, :geo do
context 'with batch_size' do
it 'returns a batch of jobs' do
expect(subject.send(:take_batch, a, b, c, batch_size: 2)).to eq([
[3, :job_artifact],
[2, :lfs]
[2, :lfs],
[3, :job_artifact]
])
end
end
end
describe '#interleave' do
# Notice ties are resolved by taking the "first" tied element
it 'interleaves 2 arrays' do
a = %w{1 2 3}
b = %w{A B C}
expect(subject.send(:interleave, a, b)).to eq(%w{1 A 2 B 3 C})
end
# Notice there are no ties in this call
it 'interleaves 2 arrays with a longer second array' do
a = %w{1 2}
b = %w{A B C}
expect(subject.send(:interleave, a, b)).to eq(%w{A 1 B 2 C})
end
it 'interleaves 2 arrays with a longer first array' do
a = %w{1 2 3}
b = %w{A B}
expect(subject.send(:interleave, a, b)).to eq(%w{1 A 2 B 3})
end
it 'interleaves 3 arrays' do
a = %w{1 2 3}
b = %w{A B C}
c = %w{i ii iii}
expect(subject.send(:interleave, a, b, c)).to eq(%w{1 A i 2 B ii 3 C iii})
end
it 'interleaves 3 arrays of unequal length' do
a = %w{1 2}
b = %w{A}
c = %w{i ii iii iiii}
expect(subject.send(:interleave, a, b, c)).to eq(%w{i 1 ii A iii 2 iiii})
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment