Commit 49141e3e authored by Manoj M J's avatar Manoj M J

Limit Sidekiq `push_bulk` to a maximum of 1000 jobs in one go

This change limits Sidekiq `push_bulk`
to a maximum of 1000 jobs in one go.
parent a0d10fed
......@@ -14,6 +14,7 @@ module ApplicationWorker
LOGGING_EXTRA_KEY = 'extra'
DEFAULT_DELAY_INTERVAL = 1
SAFE_PUSH_BULK_LIMIT = 1000
included do
set_queue
......@@ -135,24 +136,47 @@ module ApplicationWorker
end
def bulk_perform_async(args_list)
if Feature.enabled?(:sidekiq_push_bulk_in_batches)
in_safe_limit_batches(args_list) do |args_batch, _|
Sidekiq::Client.push_bulk('class' => self, 'args' => args_batch)
end
else
Sidekiq::Client.push_bulk('class' => self, 'args' => args_list)
end
end
def bulk_perform_in(delay, args_list, batch_size: nil, batch_delay: nil)
now = Time.now.to_i
schedule = now + delay.to_i
base_schedule_at = now + delay.to_i
if schedule <= now
raise ArgumentError, _('The schedule time must be in the future!')
if base_schedule_at <= now
raise ArgumentError, 'The schedule time must be in the future!'
end
schedule_at = base_schedule_at
if batch_size && batch_delay
args_list.each_slice(batch_size.to_i).with_index do |args_batch, idx|
batch_schedule = schedule + idx * batch_delay.to_i
Sidekiq::Client.push_bulk('class' => self, 'args' => args_batch, 'at' => batch_schedule)
batch_size = batch_size.to_i
batch_delay = batch_delay.to_i
raise ArgumentError, 'batch_size should be greater than 0' unless batch_size > 0
raise ArgumentError, 'batch_delay should be greater than 0' unless batch_delay > 0
# build an array of schedules corresponding to each item in `args_list`
bulk_schedule_at = Array.new(args_list.size) do |index|
batch_number = index / batch_size
base_schedule_at + (batch_number * batch_delay)
end
schedule_at = bulk_schedule_at
end
if Feature.enabled?(:sidekiq_push_bulk_in_batches)
in_safe_limit_batches(args_list, schedule_at) do |args_batch, schedule_at_for_batch|
Sidekiq::Client.push_bulk('class' => self, 'args' => args_batch, 'at' => schedule_at_for_batch)
end
else
Sidekiq::Client.push_bulk('class' => self, 'args' => args_list, 'at' => schedule)
Sidekiq::Client.push_bulk('class' => self, 'args' => args_list, 'at' => schedule_at)
end
end
......@@ -161,5 +185,34 @@ module ApplicationWorker
def delay_interval
DEFAULT_DELAY_INTERVAL.seconds
end
private
def in_safe_limit_batches(args_list, schedule_at = nil, safe_limit = SAFE_PUSH_BULK_LIMIT)
# `schedule_at` could be one of
# - nil.
# - a single Numeric that represents time, like `30.minutes.from_now.to_i`.
# - an array, where each element is a Numeric that reprsents time.
# - Each element in this array would correspond to the time at which
# - the job in `args_list` at the corresponding index needs to be scheduled.
# In the case where `schedule_at` is an array of Numeric, it needs to be sliced
# in the same manner as the `args_list`, with each slice containing `safe_limit`
# number of elements.
schedule_at = schedule_at.each_slice(safe_limit).to_a if schedule_at.is_a?(Array)
args_list.each_slice(safe_limit).with_index.flat_map do |args_batch, index|
schedule_at_for_batch = process_schedule_at_for_batch(schedule_at, index)
yield(args_batch, schedule_at_for_batch)
end
end
def process_schedule_at_for_batch(schedule_at, index)
return unless schedule_at
return schedule_at[index] if schedule_at.is_a?(Array) && schedule_at.all?(Array)
schedule_at
end
end
end
---
name: sidekiq_push_bulk_in_batches
introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/72263
rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/343740
milestone: '14.5'
type: development
group: group::access
default_enabled: false
......@@ -34211,9 +34211,6 @@ msgstr ""
msgid "The same shared runner executes code from multiple projects, unless you configure autoscaling with %{link} set to 1 (which it is on GitLab.com)."
msgstr ""
msgid "The schedule time must be in the future!"
msgstr ""
msgid "The snippet can be accessed without any authentication."
msgstr ""
......
......@@ -285,48 +285,38 @@ RSpec.describe ApplicationWorker do
end
end
describe '.bulk_perform_async' do
context 'different kinds of push_bulk' do
shared_context 'disable the `sidekiq_push_bulk_in_batches` feature flag' do
before do
stub_const(worker.name, worker)
stub_feature_flags(sidekiq_push_bulk_in_batches: false)
end
it 'enqueues jobs in bulk' do
Sidekiq::Testing.fake! do
worker.bulk_perform_async([['Foo', [1]], ['Foo', [2]]])
expect(worker.jobs.count).to eq 2
expect(worker.jobs).to all(include('enqueued_at'))
end
shared_context 'set safe limit beyond the number of jobs to be enqueued' do
before do
stub_const("#{described_class}::SAFE_PUSH_BULK_LIMIT", args.count + 1)
end
end
describe '.bulk_perform_in' do
shared_context 'set safe limit below the number of jobs to be enqueued' do
before do
stub_const(worker.name, worker)
stub_const("#{described_class}::SAFE_PUSH_BULK_LIMIT", 2)
end
end
context 'when delay is valid' do
it 'correctly schedules jobs' do
Sidekiq::Testing.fake! do
worker.bulk_perform_in(1.minute, [['Foo', [1]], ['Foo', [2]]])
shared_examples_for 'returns job_id of all enqueued jobs' do
let(:job_id_regex) { /[0-9a-f]{12}/ }
expect(worker.jobs.count).to eq 2
expect(worker.jobs).to all(include('at'))
end
end
end
it 'returns job_id of all enqueued jobs' do
job_ids = perform_action
context 'when delay is invalid' do
it 'raises an ArgumentError exception' do
expect { worker.bulk_perform_in(-60, [['Foo']]) }
.to raise_error(ArgumentError)
expect(job_ids.count).to eq(args.count)
expect(job_ids).to all(match(job_id_regex))
end
end
context 'with batches' do
let(:batch_delay) { 1.minute }
it 'correctly schedules jobs' do
shared_examples_for 'enqueues the jobs in a batched fashion, with each batch enqueing jobs as per the set safe limit' do
it 'enqueues the jobs in a batched fashion, with each batch enqueing jobs as per the set safe limit' do
expect(Sidekiq::Client).to(
receive(:push_bulk).with(hash_including('args' => [['Foo', [1]], ['Foo', [2]]]))
.ordered
......@@ -340,25 +330,254 @@ RSpec.describe ApplicationWorker do
.ordered
.and_call_original)
worker.bulk_perform_in(
1.minute,
[['Foo', [1]], ['Foo', [2]], ['Foo', [3]], ['Foo', [4]], ['Foo', [5]]],
batch_size: 2, batch_delay: batch_delay)
perform_action
expect(worker.jobs.count).to eq args.count
expect(worker.jobs).to all(include('enqueued_at'))
end
end
shared_examples_for 'enqueues jobs in one go' do
it 'enqueues jobs in one go' do
expect(Sidekiq::Client).to(
receive(:push_bulk).with(hash_including('args' => args)).once.and_call_original)
perform_action
expect(worker.jobs.count).to eq args.count
expect(worker.jobs).to all(include('enqueued_at'))
end
end
before do
stub_const(worker.name, worker)
end
let(:args) do
[
['Foo', [1]],
['Foo', [2]],
['Foo', [3]],
['Foo', [4]],
['Foo', [5]]
]
end
describe '.bulk_perform_async' do
shared_examples_for 'does not schedule the jobs for any specific time' do
it 'does not schedule the jobs for any specific time' do
perform_action
expect(worker.jobs).to all(exclude('at'))
end
end
subject(:perform_action) do
worker.bulk_perform_async(args)
end
context 'push_bulk in safe limit batches' do
context 'when the number of jobs to be enqueued does not exceed the safe limit' do
include_context 'set safe limit beyond the number of jobs to be enqueued'
it_behaves_like 'enqueues jobs in one go'
it_behaves_like 'returns job_id of all enqueued jobs'
it_behaves_like 'does not schedule the jobs for any specific time'
end
context 'when the number of jobs to be enqueued exceeds safe limit' do
include_context 'set safe limit below the number of jobs to be enqueued'
it_behaves_like 'enqueues the jobs in a batched fashion, with each batch enqueing jobs as per the set safe limit'
it_behaves_like 'returns job_id of all enqueued jobs'
it_behaves_like 'does not schedule the jobs for any specific time'
end
context 'when the feature flag `sidekiq_push_bulk_in_batches` is disabled' do
include_context 'disable the `sidekiq_push_bulk_in_batches` feature flag'
context 'when the number of jobs to be enqueued does not exceed the safe limit' do
include_context 'set safe limit beyond the number of jobs to be enqueued'
it_behaves_like 'enqueues jobs in one go'
it_behaves_like 'returns job_id of all enqueued jobs'
it_behaves_like 'does not schedule the jobs for any specific time'
end
context 'when the number of jobs to be enqueued exceeds safe limit' do
include_context 'set safe limit below the number of jobs to be enqueued'
it_behaves_like 'enqueues jobs in one go'
it_behaves_like 'returns job_id of all enqueued jobs'
it_behaves_like 'does not schedule the jobs for any specific time'
end
end
end
end
describe '.bulk_perform_in' do
context 'without batches' do
shared_examples_for 'schedules all the jobs at a specific time' do
it 'schedules all the jobs at a specific time' do
perform_action
worker.jobs.each do |job_detail|
expect(job_detail['at']).to be_within(3.seconds).of(expected_scheduled_at_time)
end
end
end
let(:delay) { 3.minutes }
let(:expected_scheduled_at_time) { Time.current.to_i + delay.to_i }
subject(:perform_action) do
worker.bulk_perform_in(delay, args)
end
context 'when the scheduled time falls in the past' do
let(:delay) { -60 }
it 'raises an ArgumentError exception' do
expect { perform_action }
.to raise_error(ArgumentError)
end
end
context 'push_bulk in safe limit batches' do
context 'when the number of jobs to be enqueued does not exceed the safe limit' do
include_context 'set safe limit beyond the number of jobs to be enqueued'
it_behaves_like 'enqueues jobs in one go'
it_behaves_like 'returns job_id of all enqueued jobs'
it_behaves_like 'schedules all the jobs at a specific time'
end
context 'when the number of jobs to be enqueued exceeds safe limit' do
include_context 'set safe limit below the number of jobs to be enqueued'
it_behaves_like 'enqueues the jobs in a batched fashion, with each batch enqueing jobs as per the set safe limit'
it_behaves_like 'returns job_id of all enqueued jobs'
it_behaves_like 'schedules all the jobs at a specific time'
end
context 'when the feature flag `sidekiq_push_bulk_in_batches` is disabled' do
include_context 'disable the `sidekiq_push_bulk_in_batches` feature flag'
context 'when the number of jobs to be enqueued does not exceed the safe limit' do
include_context 'set safe limit beyond the number of jobs to be enqueued'
it_behaves_like 'enqueues jobs in one go'
it_behaves_like 'returns job_id of all enqueued jobs'
it_behaves_like 'schedules all the jobs at a specific time'
end
context 'when the number of jobs to be enqueued exceeds safe limit' do
include_context 'set safe limit below the number of jobs to be enqueued'
it_behaves_like 'enqueues jobs in one go'
it_behaves_like 'returns job_id of all enqueued jobs'
it_behaves_like 'schedules all the jobs at a specific time'
end
end
end
end
context 'with batches' do
shared_examples_for 'schedules all the jobs at a specific time, per batch' do
it 'schedules all the jobs at a specific time, per batch' do
perform_action
expect(worker.jobs.count).to eq 5
expect(worker.jobs[0]['at']).to eq(worker.jobs[1]['at'])
expect(worker.jobs[2]['at']).to eq(worker.jobs[3]['at'])
expect(worker.jobs[2]['at'] - worker.jobs[1]['at']).to eq(batch_delay)
expect(worker.jobs[4]['at'] - worker.jobs[3]['at']).to eq(batch_delay)
end
end
let(:delay) { 1.minute }
let(:batch_size) { 2 }
let(:batch_delay) { 10.minutes }
subject(:perform_action) do
worker.bulk_perform_in(delay, args, batch_size: batch_size, batch_delay: batch_delay)
end
context 'when the `batch_size` is invalid' do
context 'when `batch_size` is 0' do
let(:batch_size) { 0 }
it 'raises an ArgumentError exception' do
expect { perform_action }
.to raise_error(ArgumentError)
end
end
context 'when `batch_size` is negative' do
let(:batch_size) { -3 }
it 'raises an ArgumentError exception' do
expect { perform_action }
.to raise_error(ArgumentError)
end
end
end
context 'when the `batch_delay` is invalid' do
context 'when `batch_delay` is 0' do
let(:batch_delay) { 0.minutes }
context 'when batch_size is invalid' do
it 'raises an ArgumentError exception' do
expect do
worker.bulk_perform_in(1.minute,
[['Foo']],
batch_size: -1, batch_delay: batch_delay)
end.to raise_error(ArgumentError)
expect { perform_action }
.to raise_error(ArgumentError)
end
end
context 'when `batch_delay` is negative' do
let(:batch_delay) { -3.minutes }
it 'raises an ArgumentError exception' do
expect { perform_action }
.to raise_error(ArgumentError)
end
end
end
context 'push_bulk in safe limit batches' do
context 'when the number of jobs to be enqueued does not exceed the safe limit' do
include_context 'set safe limit beyond the number of jobs to be enqueued'
it_behaves_like 'enqueues jobs in one go'
it_behaves_like 'returns job_id of all enqueued jobs'
it_behaves_like 'schedules all the jobs at a specific time, per batch'
end
context 'when the number of jobs to be enqueued exceeds safe limit' do
include_context 'set safe limit below the number of jobs to be enqueued'
it_behaves_like 'enqueues the jobs in a batched fashion, with each batch enqueing jobs as per the set safe limit'
it_behaves_like 'returns job_id of all enqueued jobs'
it_behaves_like 'schedules all the jobs at a specific time, per batch'
end
context 'when the feature flag `sidekiq_push_bulk_in_batches` is disabled' do
include_context 'disable the `sidekiq_push_bulk_in_batches` feature flag'
context 'when the number of jobs to be enqueued does not exceed the safe limit' do
include_context 'set safe limit beyond the number of jobs to be enqueued'
it_behaves_like 'enqueues jobs in one go'
it_behaves_like 'returns job_id of all enqueued jobs'
it_behaves_like 'schedules all the jobs at a specific time, per batch'
end
context 'when the number of jobs to be enqueued exceeds safe limit' do
include_context 'set safe limit below the number of jobs to be enqueued'
it_behaves_like 'enqueues jobs in one go'
it_behaves_like 'returns job_id of all enqueued jobs'
it_behaves_like 'schedules all the jobs at a specific time, per batch'
end
end
end
end
end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment