Commit 29132e38 authored by Sean McGivern's avatar Sean McGivern

Disable JobWaiter when there are many jobs

In WaitableWorker, when there are fewer than four jobs to process we
simply process them without going through Sidekiq. When there are more
jobs, we run them in Sidekiq but add a second argument for a JobWaiter
key, and wait for `timeout` seconds to see if those jobs finished.

When there are very many jobs in the batch, we will never finish the
batch before the timeout. This has these consequences:

1. The HTTP request takes longer as we're waiting for the timeout.
2. The jobs can't be deduplicated as the second argument is a unique
   JobWaiter key.

This change adds a feature flag, skip_job_waiter_for_large_batches, that
doesn't use the JobWaiter key when the batch size is more than ten times
the timeout, as we can process approximately ten jobs per second. By
default the flag is off.
parent 29fe14e2
...@@ -9,6 +9,15 @@ module WaitableWorker ...@@ -9,6 +9,15 @@ module WaitableWorker
# Short-circuit: it's more efficient to do small numbers of jobs inline # Short-circuit: it's more efficient to do small numbers of jobs inline
return bulk_perform_inline(args_list) if args_list.size <= 3 return bulk_perform_inline(args_list) if args_list.size <= 3
# Don't wait if there's too many jobs to be waited for. Not including the
# waiter allows them to be deduplicated and it skips waiting for jobs that
# are not likely to finish within the timeout. This assumes we can process
# 10 jobs per second:
# https://gitlab.com/gitlab-com/gl-infra/scalability/-/issues/205
if ::Feature.enabled?(:skip_job_waiter_for_large_batches)
return bulk_perform_async(args_list) if args_list.length >= 10 * timeout
end
waiter = Gitlab::JobWaiter.new(args_list.size, worker_label: self.to_s) waiter = Gitlab::JobWaiter.new(args_list.size, worker_label: self.to_s)
# Point all the bulk jobs at the same JobWaiter. Converts, [[1], [2], [3]] # Point all the bulk jobs at the same JobWaiter. Converts, [[1], [2], [3]]
......
...@@ -44,11 +44,35 @@ describe WaitableWorker do ...@@ -44,11 +44,35 @@ describe WaitableWorker do
expect(worker.counter).to eq(6) expect(worker.counter).to eq(6)
end end
it 'runs > 3 jobs using sidekiq' do it 'runs > 3 jobs using sidekiq and a waiter key' do
expect(worker).to receive(:bulk_perform_async) expect(worker).to receive(:bulk_perform_async)
.with([[1, anything], [2, anything], [3, anything], [4, anything]])
worker.bulk_perform_and_wait([[1], [2], [3], [4]]) worker.bulk_perform_and_wait([[1], [2], [3], [4]])
end end
it 'runs > 10 * timeout jobs using sidekiq and no waiter key' do
arguments = 1.upto(21).map { |i| [i] }
expect(worker).to receive(:bulk_perform_async).with(arguments)
worker.bulk_perform_and_wait(arguments, timeout: 2)
end
context 'when the skip_job_waiter_for_large_batches flag is disabled' do
before do
stub_feature_flags(skip_job_waiter_for_large_batches: false)
end
it 'runs jobs over 10 * the timeout using a waiter key' do
arguments = 1.upto(21).map { |i| [i] }
arguments_with_waiter = arguments.map { |arg| arg + [anything] }
expect(worker).to receive(:bulk_perform_async).with(arguments_with_waiter)
worker.bulk_perform_and_wait(arguments, timeout: 2)
end
end
end end
describe '.bulk_perform_inline' do describe '.bulk_perform_inline' do
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment