Commit f386921b authored by Imre Farkas's avatar Imre Farkas

Add jitter when scheduling low urgency project auth update workers

New options added to ApplicationWorker#bulk_perform_in:
- batch_size
- batch_delay
parent a6fc61a1
...@@ -19,7 +19,8 @@ class UserProjectAccessChangedService ...@@ -19,7 +19,8 @@ class UserProjectAccessChangedService
if priority == HIGH_PRIORITY if priority == HIGH_PRIORITY
AuthorizedProjectsWorker.bulk_perform_async(bulk_args) # rubocop:disable Scalability/BulkPerformWithContext AuthorizedProjectsWorker.bulk_perform_async(bulk_args) # rubocop:disable Scalability/BulkPerformWithContext
else else
AuthorizedProjectUpdate::UserRefreshWithLowUrgencyWorker.bulk_perform_in(DELAY, bulk_args) # rubocop:disable Scalability/BulkPerformWithContext AuthorizedProjectUpdate::UserRefreshWithLowUrgencyWorker.bulk_perform_in( # rubocop:disable Scalability/BulkPerformWithContext
DELAY, bulk_args, batch_size: 100, batch_delay: 30.seconds)
end end
end end
end end
......
...@@ -84,7 +84,7 @@ module ApplicationWorker ...@@ -84,7 +84,7 @@ module ApplicationWorker
Sidekiq::Client.push_bulk('class' => self, 'args' => args_list) Sidekiq::Client.push_bulk('class' => self, 'args' => args_list)
end end
def bulk_perform_in(delay, args_list) def bulk_perform_in(delay, args_list, batch_size: nil, batch_delay: nil)
now = Time.now.to_i now = Time.now.to_i
schedule = now + delay.to_i schedule = now + delay.to_i
...@@ -92,7 +92,14 @@ module ApplicationWorker ...@@ -92,7 +92,14 @@ module ApplicationWorker
raise ArgumentError, _('The schedule time must be in the future!') raise ArgumentError, _('The schedule time must be in the future!')
end end
Sidekiq::Client.push_bulk('class' => self, 'args' => args_list, 'at' => schedule) if batch_size && batch_delay
args_list.each_slice(batch_size.to_i).with_index do |args_batch, idx|
batch_schedule = schedule + idx * batch_delay.to_i
Sidekiq::Client.push_bulk('class' => self, 'args' => args_batch, 'at' => batch_schedule)
end
else
Sidekiq::Client.push_bulk('class' => self, 'args' => args_list, 'at' => schedule)
end
end end
end end
end end
...@@ -558,7 +558,9 @@ describe Projects::CreateService, '#execute' do ...@@ -558,7 +558,9 @@ describe Projects::CreateService, '#execute' do
) )
expect(AuthorizedProjectUpdate::UserRefreshWithLowUrgencyWorker).to( expect(AuthorizedProjectUpdate::UserRefreshWithLowUrgencyWorker).to(
receive(:bulk_perform_in) receive(:bulk_perform_in)
.with(1.hour, array_including([user.id], [other_user.id])) .with(1.hour,
array_including([user.id], [other_user.id]),
batch_delay: 30.seconds, batch_size: 100)
.and_call_original .and_call_original
) )
......
...@@ -20,7 +20,11 @@ describe UserProjectAccessChangedService do ...@@ -20,7 +20,11 @@ describe UserProjectAccessChangedService do
it 'permits low-priority operation' do it 'permits low-priority operation' do
expect(AuthorizedProjectUpdate::UserRefreshWithLowUrgencyWorker).to( expect(AuthorizedProjectUpdate::UserRefreshWithLowUrgencyWorker).to(
receive(:bulk_perform_in).with(described_class::DELAY, [[1], [2]]) receive(:bulk_perform_in).with(
described_class::DELAY,
[[1], [2]],
{ batch_delay: 30.seconds, batch_size: 100 }
)
) )
described_class.new([1, 2]).execute(blocking: false, described_class.new([1, 2]).execute(blocking: false,
......
...@@ -118,5 +118,45 @@ describe ApplicationWorker do ...@@ -118,5 +118,45 @@ describe ApplicationWorker do
.to raise_error(ArgumentError) .to raise_error(ArgumentError)
end end
end end
context 'with batches' do
let(:batch_delay) { 1.minute }
it 'correctly schedules jobs' do
expect(Sidekiq::Client).to(
receive(:push_bulk).with(hash_including('args' => [['Foo', [1]], ['Foo', [2]]]))
.ordered
.and_call_original)
expect(Sidekiq::Client).to(
receive(:push_bulk).with(hash_including('args' => [['Foo', [3]], ['Foo', [4]]]))
.ordered
.and_call_original)
expect(Sidekiq::Client).to(
receive(:push_bulk).with(hash_including('args' => [['Foo', [5]]]))
.ordered
.and_call_original)
worker.bulk_perform_in(
1.minute,
[['Foo', [1]], ['Foo', [2]], ['Foo', [3]], ['Foo', [4]], ['Foo', [5]]],
batch_size: 2, batch_delay: batch_delay)
expect(worker.jobs.count).to eq 5
expect(worker.jobs[0]['at']).to eq(worker.jobs[1]['at'])
expect(worker.jobs[2]['at']).to eq(worker.jobs[3]['at'])
expect(worker.jobs[2]['at'] - worker.jobs[1]['at']).to eq(batch_delay)
expect(worker.jobs[4]['at'] - worker.jobs[3]['at']).to eq(batch_delay)
end
context 'when batch_size is invalid' do
it 'raises an ArgumentError exception' do
expect do
worker.bulk_perform_in(1.minute,
[['Foo']],
batch_size: -1, batch_delay: batch_delay)
end.to raise_error(ArgumentError)
end
end
end
end end
end end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment