Commit 5c4f99a3 authored by Yorick Peterse's avatar Yorick Peterse

Merge branch 'fix/gb/process-scheduled-background-migrations' into 'master'

Process scheduled background migrations as well

Closes #34951

See merge request !12787
parents c64fd519 e41d42d6
module Gitlab module Gitlab
module BackgroundMigration module BackgroundMigration
def self.queue
@queue ||= BackgroundMigrationWorker.sidekiq_options['queue']
end
# Begins stealing jobs from the background migrations queue, blocking the # Begins stealing jobs from the background migrations queue, blocking the
# caller until all jobs have been completed. # caller until all jobs have been completed.
# #
# When a migration raises a StandardError is is going to be retries up to
# three times, for example, to recover from a deadlock.
#
# When Exception is being raised, it enqueues the migration again, and
# re-raises the exception.
#
# steal_class - The name of the class for which to steal jobs. # steal_class - The name of the class for which to steal jobs.
def self.steal(steal_class) def self.steal(steal_class)
queue = Sidekiq::Queue enqueued = Sidekiq::Queue.new(self.queue)
.new(BackgroundMigrationWorker.sidekiq_options['queue']) scheduled = Sidekiq::ScheduledSet.new
[scheduled, enqueued].each do |queue|
queue.each do |job| queue.each do |job|
migration_class, migration_args = job.args migration_class, migration_args = job.args
next unless job.queue == self.queue
next unless migration_class == steal_class next unless migration_class == steal_class
perform(migration_class, migration_args) begin
perform(migration_class, migration_args, retries: 3) if job.delete
rescue Exception # rubocop:disable Lint/RescueException
BackgroundMigrationWorker # enqueue this migration again
.perform_async(migration_class, migration_args)
job.delete raise
end
end
end end
end end
##
# Performs a background migration.
#
# class_name - The name of the background migration class as defined in the # class_name - The name of the background migration class as defined in the
# Gitlab::BackgroundMigration namespace. # Gitlab::BackgroundMigration namespace.
# #
......
require 'spec_helper' require 'spec_helper'
describe Gitlab::BackgroundMigration do describe Gitlab::BackgroundMigration do
describe '.queue' do
it 'returns background migration worker queue' do
expect(described_class.queue)
.to eq BackgroundMigrationWorker.sidekiq_options['queue']
end
end
describe '.steal' do describe '.steal' do
it 'steals jobs from a queue' do context 'when there are enqueued jobs present' do
queue = [double(:job, args: ['Foo', [10, 20]])] let(:queue) do
[double(args: ['Foo', [10, 20]], queue: described_class.queue)]
end
before do
allow(Sidekiq::Queue).to receive(:new) allow(Sidekiq::Queue).to receive(:new)
.with(BackgroundMigrationWorker.sidekiq_options['queue']) .with(described_class.queue)
.and_return(queue) .and_return(queue)
end
expect(queue[0]).to receive(:delete) context 'when queue contains unprocessed jobs' do
it 'steals jobs from a queue' do
expect(queue[0]).to receive(:delete).and_return(true)
expect(described_class).to receive(:perform).with('Foo', [10, 20]) expect(described_class).to receive(:perform)
.with('Foo', [10, 20], anything)
described_class.steal('Foo') described_class.steal('Foo')
end end
it 'does not steal jobs for a different migration' do it 'does not steal job that has already been taken' do
queue = [double(:job, args: ['Foo', [10, 20]])] expect(queue[0]).to receive(:delete).and_return(false)
allow(Sidekiq::Queue).to receive(:new) expect(described_class).not_to receive(:perform)
.with(BackgroundMigrationWorker.sidekiq_options['queue'])
.and_return(queue)
described_class.steal('Foo')
end
it 'does not steal jobs for a different migration' do
expect(described_class).not_to receive(:perform) expect(described_class).not_to receive(:perform)
expect(queue[0]).not_to receive(:delete) expect(queue[0]).not_to receive(:delete)
...@@ -31,16 +47,74 @@ describe Gitlab::BackgroundMigration do ...@@ -31,16 +47,74 @@ describe Gitlab::BackgroundMigration do
end end
end end
context 'when one of the jobs raises an error' do
let(:migration) { spy(:migration) }
let(:queue) do
[double(args: ['Foo', [10, 20]], queue: described_class.queue),
double(args: ['Foo', [20, 30]], queue: described_class.queue)]
end
before do
stub_const("#{described_class}::Foo", migration)
allow(queue[0]).to receive(:delete).and_return(true)
allow(queue[1]).to receive(:delete).and_return(true)
end
it 'enqueues the migration again and re-raises the error' do
allow(migration).to receive(:perform).with(10, 20)
.and_raise(Exception, 'Migration error').once
expect(BackgroundMigrationWorker).to receive(:perform_async)
.with('Foo', [10, 20]).once
expect { described_class.steal('Foo') }.to raise_error(Exception)
end
end
end
context 'when there are scheduled jobs present', :sidekiq, :redis do
it 'steals all jobs from the scheduled sets' do
Sidekiq::Testing.disable! do
BackgroundMigrationWorker.perform_in(10.minutes, 'Object')
expect(Sidekiq::ScheduledSet.new).to be_one
expect(described_class).to receive(:perform).with('Object', any_args)
described_class.steal('Object')
expect(Sidekiq::ScheduledSet.new).to be_none
end
end
end
context 'when there are enqueued and scheduled jobs present', :sidekiq, :redis do
it 'steals from the scheduled sets queue first' do
Sidekiq::Testing.disable! do
expect(described_class).to receive(:perform)
.with('Object', [1], anything).ordered
expect(described_class).to receive(:perform)
.with('Object', [2], anything).ordered
BackgroundMigrationWorker.perform_async('Object', [2])
BackgroundMigrationWorker.perform_in(10.minutes, 'Object', [1])
described_class.steal('Object')
end
end
end
end
describe '.perform' do describe '.perform' do
it 'performs a background migration' do let(:migration) { spy(:migration) }
instance = double(:instance)
klass = double(:klass, new: instance)
expect(described_class).to receive(:const_get) before do
.with('Foo') stub_const("#{described_class.name}::Foo", migration)
.and_return(klass) end
expect(instance).to receive(:perform).with(10, 20) it 'performs a background migration' do
expect(migration).to receive(:perform).with(10, 20).once
described_class.perform('Foo', [10, 20]) described_class.perform('Foo', [10, 20])
end end
......
...@@ -8,4 +8,8 @@ RSpec.configure do |config| ...@@ -8,4 +8,8 @@ RSpec.configure do |config|
config.after(:each, :sidekiq) do config.after(:each, :sidekiq) do
Sidekiq::Worker.clear_all Sidekiq::Worker.clear_all
end end
config.after(:each, :sidekiq, :redis) do
Sidekiq.redis { |redis| redis.flushdb }
end
end end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment