Commit 9b02b9f6 authored by Jacob Vosmaer's avatar Jacob Vosmaer

Only count running jobs in LimitedCapacity::Worker

The old implementation of LimitedCapacity::Worker relies on querying
the number of jobs enqueued for the worker it applies to. In
https://gitlab.com/groups/gitlab-com/gl-infra/-/epics/447 we are
working towards a situation where you can no longer get this number
from Sidekiq (in an efficient way, at least).

This commit changes the capacity checking logic to only use the set of
"running jobs". The capacity check now happens in exactly one place,
namely LimitedCapacity::Worker#perform.
LimitedCapacity::Worker.perform_with_capacity now always schedules
max_running_jobs jobs. Sometimes these will be no-ops but that is OK,
we expect less than 0.01 no-op jobs per second on GitLab.com.
parent 1c837773
......@@ -3,21 +3,30 @@ module LimitedCapacity
class JobTracker # rubocop:disable Scalability/IdempotentWorker
include Gitlab::Utils::StrongMemoize
LUA_REGISTER_SCRIPT = <<~EOS
local set_key, element, max_elements = KEYS[1], ARGV[1], ARGV[2]
if redis.call("scard", set_key) < tonumber(max_elements) then
redis.call("sadd", set_key, element)
return true
end
return false
EOS
def initialize(namespace)
@namespace = namespace
end
def register(jid)
_added, @count = with_redis_pipeline do |redis|
register_job_keys(redis, jid)
get_job_count(redis)
end
def register(jid, max_jids)
with_redis do |redis|
redis.eval(LUA_REGISTER_SCRIPT, keys: [counter_key], argv: [jid, max_jids])
end.present?
end
def remove(jid)
_removed, @count = with_redis_pipeline do |redis|
with_redis do |redis|
remove_job_keys(redis, jid)
get_job_count(redis)
end
end
......@@ -25,14 +34,13 @@ module LimitedCapacity
completed_jids = Gitlab::SidekiqStatus.completed_jids(running_jids)
return unless completed_jids.any?
_removed, @count = with_redis_pipeline do |redis|
with_redis do |redis|
remove_job_keys(redis, completed_jids)
get_job_count(redis)
end
end
def count
@count ||= with_redis { |redis| get_job_count(redis) }
with_redis { |redis| redis.scard(counter_key) }
end
def running_jids
......@@ -49,14 +57,6 @@ module LimitedCapacity
"worker:#{namespace.to_s.underscore}:running"
end
def get_job_count(redis)
redis.scard(counter_key)
end
def register_job_keys(redis, keys)
redis.sadd(counter_key, keys)
end
def remove_job_keys(redis, keys)
redis.srem(counter_key, keys)
end
......@@ -64,11 +64,5 @@ module LimitedCapacity
def with_redis(&block)
Gitlab::Redis::Queues.with(&block) # rubocop: disable CodeReuse/ActiveRecord
end
def with_redis_pipeline(&block)
with_redis do |redis|
redis.pipelined(&block)
end
end
end
end
......@@ -55,26 +55,14 @@ module LimitedCapacity
def perform_with_capacity(*args)
worker = self.new
worker.remove_failed_jobs
worker.report_prometheus_metrics(*args)
required_jobs_count = worker.required_jobs_count(*args)
arguments = Array.new(required_jobs_count) { args }
arguments = Array.new(worker.max_running_jobs) { args }
self.bulk_perform_async(arguments) # rubocop:disable Scalability/BulkPerformWithContext
end
end
def perform(*args)
return unless has_capacity?
job_tracker.register(jid)
report_running_jobs_metrics
perform_work(*args)
rescue StandardError => exception
raise
ensure
job_tracker.remove(jid)
report_prometheus_metrics(*args)
re_enqueue(*args) unless exception
perform_registered(*args) if job_tracker.register(jid, max_running_jobs)
end
def perform_work(*args)
......@@ -89,21 +77,6 @@ module LimitedCapacity
raise NotImplementedError
end
def has_capacity?
remaining_capacity > 0
end
def remaining_capacity
[
max_running_jobs - running_jobs_count - self.class.queue_size,
0
].max
end
def has_work?(*args)
remaining_work_count(*args) > 0
end
def remove_failed_jobs
job_tracker.clean_up
end
......@@ -114,18 +87,22 @@ module LimitedCapacity
set_metric(:max_running_jobs_gauge, max_running_jobs)
end
def report_running_jobs_metrics
set_metric(:running_jobs_gauge, running_jobs_count)
end
private
def required_jobs_count(*args)
[
remaining_work_count(*args),
remaining_capacity
].min
def perform_registered(*args)
report_running_jobs_metrics
perform_work(*args)
rescue StandardError => exception
raise
ensure
job_tracker.remove(jid)
report_prometheus_metrics(*args)
re_enqueue(*args) unless exception
end
private
def report_running_jobs_metrics
set_metric(:running_jobs_gauge, running_jobs_count)
end
def running_jobs_count
job_tracker.count
......@@ -138,8 +115,7 @@ module LimitedCapacity
end
def re_enqueue(*args)
return unless has_capacity?
return unless has_work?(*args)
return unless remaining_work_count(*args) > 0
self.class.perform_async(*args)
end
......
......@@ -7,30 +7,30 @@ RSpec.describe LimitedCapacity::JobTracker, :clean_gitlab_redis_queues do
described_class.new('namespace')
end
let(:max_jids) { 10 }
describe '#register' do
it 'adds jid to the set' do
job_tracker.register('a-job-id')
expect(job_tracker.register('a-job-id', max_jids)). to be true
expect(job_tracker.running_jids).to contain_exactly('a-job-id')
end
it 'updates the counter' do
expect { job_tracker.register('a-job-id') }
.to change { job_tracker.count }
.from(0)
.to(1)
end
it 'does it in only one Redis call' do
expect(job_tracker).to receive(:with_redis).once.and_call_original
it 'returns false if the jid was not added' do
max_jids = 2
%w[jid1 jid2].each do |jid|
expect(job_tracker.register(jid, max_jids)).to be true
end
job_tracker.register('a-job-id')
expect(job_tracker.register('jid3', max_jids)).to be false
expect(job_tracker.running_jids).to contain_exactly(*%w[jid1 jid2])
end
end
describe '#remove' do
before do
job_tracker.register(%w[a-job-id other-job-id])
%w[a-job-id other-job-id].each do |jid|
job_tracker.register(jid, max_jids)
end
end
it 'removes jid from the set' do
......@@ -38,24 +38,11 @@ RSpec.describe LimitedCapacity::JobTracker, :clean_gitlab_redis_queues do
expect(job_tracker.running_jids).to contain_exactly('a-job-id')
end
it 'updates the counter' do
expect { job_tracker.remove('other-job-id') }
.to change { job_tracker.count }
.from(2)
.to(1)
end
it 'does it in only one Redis call' do
expect(job_tracker).to receive(:with_redis).once.and_call_original
job_tracker.remove('other-job-id')
end
end
describe '#clean_up' do
before do
job_tracker.register('a-job-id')
job_tracker.register('a-job-id', max_jids)
end
context 'with running jobs' do
......@@ -83,13 +70,6 @@ RSpec.describe LimitedCapacity::JobTracker, :clean_gitlab_redis_queues do
.to change { job_tracker.running_jids.include?('a-job-id') }
end
it 'updates the counter' do
expect { job_tracker.clean_up }
.to change { job_tracker.count }
.from(1)
.to(0)
end
it 'gets the job ids, removes them, and updates the counter with only two Redis calls' do
expect(job_tracker).to receive(:with_redis).twice.and_call_original
......
......@@ -44,40 +44,22 @@ RSpec.describe LimitedCapacity::Worker, :clean_gitlab_redis_queues, :aggregate_f
describe '.perform_with_capacity' do
subject(:perform_with_capacity) { worker_class.perform_with_capacity(:arg) }
let(:max_running_jobs) { 3 }
before do
expect_next_instance_of(worker_class) do |instance|
expect(instance).to receive(:remove_failed_jobs)
expect(instance).to receive(:report_prometheus_metrics)
allow(instance).to receive(:remaining_work_count).and_return(remaining_work_count)
allow(instance).to receive(:remaining_capacity).and_return(remaining_capacity)
end
end
context 'when capacity is larger than work' do
let(:remaining_work_count) { 2 }
let(:remaining_capacity) { 3 }
it 'enqueues jobs for remaining work' do
expect(worker_class)
.to receive(:bulk_perform_async)
.with([[:arg], [:arg]])
perform_with_capacity
allow(instance).to receive(:max_running_jobs).and_return(max_running_jobs)
end
end
context 'when capacity is lower than work' do
let(:remaining_work_count) { 5 }
let(:remaining_capacity) { 3 }
it 'enqueues jobs for remaining work' do
expect(worker_class)
.to receive(:bulk_perform_async)
.with([[:arg], [:arg], [:arg]])
it 'enqueues jobs' do
expect(worker_class)
.to receive(:bulk_perform_async)
.with([[:arg], [:arg], [:arg]])
perform_with_capacity
end
perform_with_capacity
end
end
......@@ -104,34 +86,27 @@ RSpec.describe LimitedCapacity::Worker, :clean_gitlab_redis_queues, :aggregate_f
perform
end
it 'registers itself in the running set' do
it 'reports prometheus metrics' do
allow(worker).to receive(:perform_work)
expect(job_tracker).to receive(:register).with('my-jid')
expect(worker).to receive(:report_prometheus_metrics).once.and_call_original
expect(worker).to receive(:report_running_jobs_metrics).twice.and_call_original
perform
end
it 'removes itself from the running set' do
expect(job_tracker).to receive(:remove).with('my-jid')
it 'updates the running set' do
expect(job_tracker.running_jids).to be_empty
allow(worker).to receive(:perform_work)
perform
end
it 'reports prometheus metrics' do
allow(worker).to receive(:perform_work)
expect(worker).to receive(:report_prometheus_metrics).once.and_call_original
expect(worker).to receive(:report_running_jobs_metrics).twice.and_call_original
perform
expect(job_tracker.running_jids).to be_empty
end
end
context 'with capacity and without work' do
before do
allow(worker).to receive(:max_running_jobs).and_return(10)
allow(worker).to receive(:running_jobs_count).and_return(0)
allow(worker).to receive(:remaining_work_count).and_return(0)
allow(worker).to receive(:perform_work)
end
......@@ -146,7 +121,7 @@ RSpec.describe LimitedCapacity::Worker, :clean_gitlab_redis_queues, :aggregate_f
context 'without capacity' do
before do
allow(worker).to receive(:max_running_jobs).and_return(10)
allow(worker).to receive(:running_jobs_count).and_return(15)
allow(job_tracker).to receive(:register).and_return(false)
allow(worker).to receive(:remaining_work_count).and_return(10)
end
......@@ -161,27 +136,14 @@ RSpec.describe LimitedCapacity::Worker, :clean_gitlab_redis_queues, :aggregate_f
perform
end
it 'does not register in the running set' do
expect(job_tracker).not_to receive(:register)
perform
end
it 'removes itself from the running set' do
expect(job_tracker).to receive(:remove).with('my-jid')
perform
end
it 'reports prometheus metrics' do
expect(worker).to receive(:report_prometheus_metrics)
perform
end
end
context 'when perform_work fails' do
before do
allow(worker).to receive(:max_running_jobs).and_return(10)
allow(job_tracker).to receive(:register).and_return(true)
end
it 'does not re-enqueue itself' do
expect(worker).not_to receive(:re_enqueue)
......@@ -189,7 +151,7 @@ RSpec.describe LimitedCapacity::Worker, :clean_gitlab_redis_queues, :aggregate_f
end
it 'removes itself from the running set' do
expect(job_tracker).to receive(:remove)
expect(job_tracker).to receive(:remove).with('my-jid')
expect { perform }.to raise_error(NotImplementedError)
end
......@@ -202,65 +164,14 @@ RSpec.describe LimitedCapacity::Worker, :clean_gitlab_redis_queues, :aggregate_f
end
end
describe '#remaining_capacity' do
subject(:remaining_capacity) { worker.remaining_capacity }
before do
expect(worker).to receive(:max_running_jobs).and_return(max_capacity)
end
context 'when changing the capacity to a lower value' do
let(:max_capacity) { -1 }
it { expect(remaining_capacity).to eq(0) }
end
context 'when registering new jobs' do
let(:max_capacity) { 2 }
before do
job_tracker.register('a-job-id')
end
it { expect(remaining_capacity).to eq(1) }
end
context 'with jobs in the queue' do
let(:max_capacity) { 2 }
before do
expect(worker_class).to receive(:queue_size).and_return(1)
end
it { expect(remaining_capacity).to eq(1) }
end
context 'with both running jobs and queued jobs' do
let(:max_capacity) { 10 }
before do
expect(worker_class).to receive(:queue_size).and_return(5)
expect(worker).to receive(:running_jobs_count).and_return(3)
end
it { expect(remaining_capacity).to eq(2) }
end
end
describe '#remove_failed_jobs' do
subject(:remove_failed_jobs) { worker.remove_failed_jobs }
before do
job_tracker.register('a-job-id')
allow(worker).to receive(:max_running_jobs).and_return(2)
it 'removes failed jobs' do
job_tracker.register('a-job-id', 10)
expect(job_tracker).to receive(:clean_up).and_call_original
end
context 'with failed jobs' do
it 'update the available capacity' do
expect { remove_failed_jobs }.to change { worker.remaining_capacity }.by(1)
end
expect { remove_failed_jobs }.to change { job_tracker.running_jids.size }.by(-1)
end
end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment