Commit c66a028a authored by Dmitry Gruzd's avatar Dmitry Gruzd Committed by Terri Chu

Add retry as a migration option to Elasticsearch migrations

parent b9b29214
...@@ -250,6 +250,11 @@ the migration runs and set it back to that value when the migration is completed ...@@ -250,6 +250,11 @@ the migration runs and set it back to that value when the migration is completed
will halt the migration if the storage required is not available when the migration runs. The migration must provide will halt the migration if the storage required is not available when the migration runs. The migration must provide
the space required in bytes by defining a `space_required_bytes` method. the space required in bytes by defining a `space_required_bytes` method.
- `retry_on_failure` - Enable the retry on failure feature. By default, it retries
the migration 30 times. After it runs out of retries, the migration is marked as halted.
To customize the number of retries, pass the `max_attempts` argument:
`retry_on_failure max_attempts: 10`
```ruby ```ruby
# frozen_string_literal: true # frozen_string_literal: true
...@@ -259,6 +264,7 @@ class BatchedMigrationName < Elastic::Migration ...@@ -259,6 +264,7 @@ class BatchedMigrationName < Elastic::Migration
throttle_delay 10.minutes throttle_delay 10.minutes
pause_indexing! pause_indexing!
space_requirements! space_requirements!
retry_on_failure
# ... # ...
end end
......
...@@ -4,10 +4,6 @@ module Elastic ...@@ -4,10 +4,6 @@ module Elastic
class MigrationRecord class MigrationRecord
attr_reader :version, :name, :filename attr_reader :version, :name, :filename
delegate :migrate, :skip_migration?, :completed?, :batched?, :throttle_delay, :pause_indexing?,
:space_requirements?, :space_required_bytes, :obsolete?, :batch_size,
to: :migration
ELASTICSEARCH_SIZE = 25 ELASTICSEARCH_SIZE = 25
def initialize(version:, name:, filename:) def initialize(version:, name:, filename:)
...@@ -50,11 +46,27 @@ module Elastic ...@@ -50,11 +46,27 @@ module Elastic
!!load_state&.dig('halted') !!load_state&.dig('halted')
end end
def halt!(additional_options = {}) def failed?
!!load_state&.dig('failed')
end
def previous_attempts
load_state[:previous_attempts].to_i
end
def current_attempt
previous_attempts + 1
end
def halt(additional_options = {})
state = { halted: true, halted_indexing_unpaused: false }.merge(additional_options) state = { halted: true, halted_indexing_unpaused: false }.merge(additional_options)
save_state!(state) save_state!(state)
end end
def fail(additional_options = {})
halt(additional_options.merge(failed: true))
end
def name_for_key def name_for_key
name.underscore name.underscore
end end
...@@ -67,6 +79,14 @@ module Elastic ...@@ -67,6 +79,14 @@ module Elastic
halted? || completed? halted? || completed?
end end
def method_missing(method, *args, &block)
if migration.respond_to?(method)
migration.public_send(method, *args, &block) # rubocop: disable GitlabSecurity/PublicSend
else
super
end
end
def self.load_versions(completed:) def self.load_versions(completed:)
helper = Gitlab::Elastic::Helper.default helper = Gitlab::Elastic::Helper.default
helper.client helper.client
......
...@@ -7,6 +7,7 @@ module Elastic ...@@ -7,6 +7,7 @@ module Elastic
DEFAULT_THROTTLE_DELAY = 5.minutes DEFAULT_THROTTLE_DELAY = 5.minutes
DEFAULT_BATCH_SIZE = 1000 DEFAULT_BATCH_SIZE = 1000
DEFAULT_MAX_ATTEMPTS = 30
def batched? def batched?
self.class.get_batched self.class.get_batched
...@@ -28,6 +29,14 @@ module Elastic ...@@ -28,6 +29,14 @@ module Elastic
self.class.get_space_requirements self.class.get_space_requirements
end end
def retry_on_failure?
max_attempts.present?
end
def max_attempts
self.class.get_max_attempts
end
class_methods do class_methods do
def space_requirements! def space_requirements!
class_attributes[:space_requirements] = true class_attributes[:space_requirements] = true
...@@ -68,6 +77,14 @@ module Elastic ...@@ -68,6 +77,14 @@ module Elastic
def get_batch_size def get_batch_size
class_attributes[:batch_size] || DEFAULT_BATCH_SIZE class_attributes[:batch_size] || DEFAULT_BATCH_SIZE
end end
def retry_on_failure(max_attempts: DEFAULT_MAX_ATTEMPTS)
class_attributes[:max_attempts] = max_attempts
end
def get_max_attempts
class_attributes[:max_attempts]
end
end end
end end
end end
...@@ -50,7 +50,7 @@ module Elastic ...@@ -50,7 +50,7 @@ module Elastic
if free_size_bytes < space_required_bytes if free_size_bytes < space_required_bytes
logger.warn "MigrationWorker: migration[#{migration.name}] You should have at least #{number_to_human_size(space_required_bytes)} of free space in the cluster to run this migration. Please increase the storage in your Elasticsearch cluster." logger.warn "MigrationWorker: migration[#{migration.name}] You should have at least #{number_to_human_size(space_required_bytes)} of free space in the cluster to run this migration. Please increase the storage in your Elasticsearch cluster."
logger.info "MigrationWorker: migration[#{migration.name}] updating with halted: true" logger.info "MigrationWorker: migration[#{migration.name}] updating with halted: true"
migration.halt! migration.halt
break false break false
end end
...@@ -73,7 +73,7 @@ module Elastic ...@@ -73,7 +73,7 @@ module Elastic
private private
def execute_migration(migration) def execute_migration(migration)
if migration.started? && !migration.batched? if migration.started? && !migration.batched? && !migration.retry_on_failure?
logger.info "MigrationWorker: migration[#{migration.name}] did not execute migrate method since it was already executed. Waiting for migration to complete" logger.info "MigrationWorker: migration[#{migration.name}] did not execute migrate method since it was already executed. Waiting for migration to complete"
return return
...@@ -88,6 +88,22 @@ module Elastic ...@@ -88,6 +88,22 @@ module Elastic
logger.info "MigrationWorker: migration[#{migration.name}] kicking off next migration batch" logger.info "MigrationWorker: migration[#{migration.name}] kicking off next migration batch"
Elastic::MigrationWorker.perform_in(migration.throttle_delay) Elastic::MigrationWorker.perform_in(migration.throttle_delay)
end end
rescue StandardError => e
retry_migration(migration, e) if migration.retry_on_failure?
raise e
end
def retry_migration(migration, exception)
if migration.current_attempt >= migration.max_attempts
message = "MigrationWorker: migration has failed with #{exception.class}:#{exception.message}, no retries left"
logger.error message
migration.fail(message: message)
else
logger.info "MigrationWorker: increasing previous_attempts to #{migration.current_attempt}"
migration.save_state!(previous_attempts: migration.current_attempt)
end
end end
def pause_indexing!(migration) def pause_indexing!(migration)
......
...@@ -3,8 +3,7 @@ ...@@ -3,8 +3,7 @@
class DeleteNotesFromOriginalIndex < Elastic::Migration class DeleteNotesFromOriginalIndex < Elastic::Migration
batched! batched!
throttle_delay 3.minutes throttle_delay 3.minutes
retry_on_failure
MAX_ATTEMPTS = 30
QUERY_BODY = { QUERY_BODY = {
query: { query: {
...@@ -15,24 +14,15 @@ class DeleteNotesFromOriginalIndex < Elastic::Migration ...@@ -15,24 +14,15 @@ class DeleteNotesFromOriginalIndex < Elastic::Migration
}.freeze }.freeze
def migrate def migrate
retry_attempt = migration_state[:retry_attempt].to_i
task_id = migration_state[:task_id] task_id = migration_state[:task_id]
if retry_attempt >= MAX_ATTEMPTS
fail_migration_halt_error!(retry_attempt: retry_attempt)
return
end
if task_id if task_id
response = helper.task_status(task_id: task_id) response = helper.task_status(task_id: task_id)
if response['completed'] if response['completed']
log "Removing notes from the original index is completed for task_id:#{task_id}" log "Removing notes from the original index is completed for task_id:#{task_id}"
set_migration_state( set_migration_state(task_id: nil)
retry_attempt: retry_attempt,
task_id: nil
)
# since delete_by_query is using wait_for_completion = false, the task must be cleaned up # since delete_by_query is using wait_for_completion = false, the task must be cleaned up
# in Elasticsearch system .tasks index # in Elasticsearch system .tasks index
...@@ -60,16 +50,10 @@ class DeleteNotesFromOriginalIndex < Elastic::Migration ...@@ -60,16 +50,10 @@ class DeleteNotesFromOriginalIndex < Elastic::Migration
log "Removing notes from the original index is started with task_id:#{task_id}" log "Removing notes from the original index is started with task_id:#{task_id}"
set_migration_state( set_migration_state(
retry_attempt: retry_attempt,
task_id: task_id task_id: task_id
) )
rescue StandardError => e rescue StandardError => e
log "migrate failed, increasing migration_state retry_attempt: #{retry_attempt} error:#{e.class}:#{e.message}" set_migration_state(task_id: nil)
set_migration_state(
retry_attempt: retry_attempt + 1,
task_id: nil
)
raise e raise e
end end
......
...@@ -41,10 +41,10 @@ module Elastic ...@@ -41,10 +41,10 @@ module Elastic
Elastic::DataMigrationService[version] Elastic::DataMigrationService[version]
end end
def fail_migration_halt_error!(retry_attempt: 0) def fail_migration_halt_error!(options = {})
log "Halting migration on retry_attempt #{retry_attempt}" log "Halting migration with #{options}"
migration_record.halt!(retry_attempt: retry_attempt) migration_record.fail(options)
end end
def log(message) def log(message)
......
...@@ -133,7 +133,7 @@ RSpec.describe MigrateNotesToSeparateIndex do ...@@ -133,7 +133,7 @@ RSpec.describe MigrateNotesToSeparateIndex do
migration.migrate migration.migrate
expect(migration.migration_state).to match(slice: 0, max_slices: 2, retry_attempt: 30, halted: true, halted_indexing_unpaused: false) expect(migration.migration_state).to match(slice: 0, max_slices: 2, retry_attempt: 30, halted: true, failed: true, halted_indexing_unpaused: false)
expect(migration).not_to receive(:reindex) expect(migration).not_to receive(:reindex)
end end
end end
......
...@@ -133,7 +133,7 @@ RSpec.describe MigrateMergeRequestsToSeparateIndex do ...@@ -133,7 +133,7 @@ RSpec.describe MigrateMergeRequestsToSeparateIndex do
migration.migrate migration.migrate
expect(migration.migration_state).to match(slice: 0, max_slices: 2, retry_attempt: 30, halted: true, halted_indexing_unpaused: false) expect(migration.migration_state).to match(slice: 0, max_slices: 2, retry_attempt: 30, halted: true, failed: true, halted_indexing_unpaused: false)
expect(migration).not_to receive(:reindex) expect(migration).not_to receive(:reindex)
end end
end end
......
...@@ -88,7 +88,7 @@ RSpec.describe DeleteMergeRequestsFromOriginalIndex, :elastic, :sidekiq_inline d ...@@ -88,7 +88,7 @@ RSpec.describe DeleteMergeRequestsFromOriginalIndex, :elastic, :sidekiq_inline d
migration.migrate migration.migrate
expect(migration.migration_state).to match(retry_attempt: 2, task_id: nil, halted: true, halted_indexing_unpaused: false) expect(migration.migration_state).to match(retry_attempt: 2, task_id: nil, halted: true, failed: true, halted_indexing_unpaused: false)
end end
end end
......
...@@ -41,7 +41,7 @@ RSpec.describe DeleteNotesFromOriginalIndex, :elastic, :sidekiq_inline do ...@@ -41,7 +41,7 @@ RSpec.describe DeleteNotesFromOriginalIndex, :elastic, :sidekiq_inline do
it 'removes notes from the index' do it 'removes notes from the index' do
# initiate the task in Elasticsearch # initiate the task in Elasticsearch
migration.migrate migration.migrate
expect(migration.migration_state).to match(retry_attempt: 0, task_id: anything) expect(migration.migration_state).to match(task_id: anything)
task_id = migration.migration_state[:task_id] task_id = migration.migration_state[:task_id]
# the migration might not complete after the initial task is created # the migration might not complete after the initial task is created
...@@ -54,7 +54,7 @@ RSpec.describe DeleteNotesFromOriginalIndex, :elastic, :sidekiq_inline do ...@@ -54,7 +54,7 @@ RSpec.describe DeleteNotesFromOriginalIndex, :elastic, :sidekiq_inline do
end end
# verify clean up of the task from Elasticsearch # verify clean up of the task from Elasticsearch
expect(migration.migration_state).to match(retry_attempt: 0, task_id: nil) expect(migration.migration_state).to match(task_id: nil)
expect { helper.client.get(index: '.tasks', type: 'task', id: task_id) }.to raise_error(Elasticsearch::Transport::Transport::Errors::NotFound) expect { helper.client.get(index: '.tasks', type: 'task', id: task_id) }.to raise_error(Elasticsearch::Transport::Transport::Errors::NotFound)
end end
end end
...@@ -72,25 +72,11 @@ RSpec.describe DeleteNotesFromOriginalIndex, :elastic, :sidekiq_inline do ...@@ -72,25 +72,11 @@ RSpec.describe DeleteNotesFromOriginalIndex, :elastic, :sidekiq_inline do
allow(client).to receive(:delete_by_query).and_raise(StandardError) allow(client).to receive(:delete_by_query).and_raise(StandardError)
end end
it 'increases retry_attempt' do it 'resets task_id' do
migration.set_migration_state(retry_attempt: 1) migration.set_migration_state(task_id: 'task_1')
expect { migration.migrate }.to raise_error(StandardError) expect { migration.migrate }.to raise_error(StandardError)
expect(migration.migration_state).to match(retry_attempt: 2, task_id: nil) expect(migration.migration_state).to match(task_id: nil)
end
it 'fails the migration after too many attempts' do
stub_const('DeleteNotesFromOriginalIndex::MAX_ATTEMPTS', 2)
# run migration up to the set MAX_ATTEMPTS set in the migration
DeleteNotesFromOriginalIndex::MAX_ATTEMPTS.times do
expect { migration.migrate }.to raise_error(StandardError)
end
migration.migrate
expect(migration.migration_state).to match(retry_attempt: 2, task_id: nil, halted: true, halted_indexing_unpaused: false)
expect(client).not_to receive(:delete_by_query)
end end
end end
...@@ -105,9 +91,9 @@ RSpec.describe DeleteNotesFromOriginalIndex, :elastic, :sidekiq_inline do ...@@ -105,9 +91,9 @@ RSpec.describe DeleteNotesFromOriginalIndex, :elastic, :sidekiq_inline do
migration.migrate migration.migrate
end end
it 'raises an error and increases retry attempt' do it 'resets task_id' do
expect { migration.migrate }.to raise_error(/Failed to delete notes/) expect { migration.migrate }.to raise_error(/Failed to delete notes/)
expect(migration.migration_state).to match(retry_attempt: 1, task_id: nil) expect(migration.migration_state).to match(task_id: nil)
end end
end end
...@@ -116,9 +102,9 @@ RSpec.describe DeleteNotesFromOriginalIndex, :elastic, :sidekiq_inline do ...@@ -116,9 +102,9 @@ RSpec.describe DeleteNotesFromOriginalIndex, :elastic, :sidekiq_inline do
allow(client).to receive(:delete_by_query).and_return('failures' => ['failed']) allow(client).to receive(:delete_by_query).and_return('failures' => ['failed'])
end end
it 'raises an error and increases retry attempt' do it 'resets task_id' do
expect { migration.migrate }.to raise_error(/Failed to delete notes/) expect { migration.migrate }.to raise_error(/Failed to delete notes/)
expect(migration.migration_state).to match(retry_attempt: 1, task_id: nil) expect(migration.migration_state).to match(task_id: nil)
end end
end end
end end
......
...@@ -57,22 +57,30 @@ RSpec.describe Elastic::MigrationRecord, :elastic do ...@@ -57,22 +57,30 @@ RSpec.describe Elastic::MigrationRecord, :elastic do
end end
end end
describe '#halt!' do describe '#halt' do
it 'sets state for halted and halted_indexing_unpaused' do it 'sets state for halted and halted_indexing_unpaused' do
record.halt! record.halt
expect(record.load_from_index.dig('_source', 'state', 'halted')).to be_truthy expect(record.load_from_index.dig('_source', 'state', 'halted')).to be_truthy
expect(record.load_from_index.dig('_source', 'state', 'halted_indexing_unpaused')).to be_falsey expect(record.load_from_index.dig('_source', 'state', 'halted_indexing_unpaused')).to be_falsey
end end
it 'sets state with additional options if passed' do it 'sets state with additional options if passed' do
record.halt!(hello: 'world', good: 'bye') record.halt(hello: 'world', good: 'bye')
expect(record.load_from_index.dig('_source', 'state', 'hello')).to eq('world') expect(record.load_from_index.dig('_source', 'state', 'hello')).to eq('world')
expect(record.load_from_index.dig('_source', 'state', 'good')).to eq('bye') expect(record.load_from_index.dig('_source', 'state', 'good')).to eq('bye')
end end
end end
describe '#fail' do
it 'calls halt with failed: true' do
expect(record).to receive(:halt).with(failed: true, foo: :bar)
record.fail(foo: :bar)
end
end
describe '#started?' do describe '#started?' do
it 'changes on object save' do it 'changes on object save' do
expect { record.save!(completed: true) }.to change { record.started? }.from(false).to(true) expect { record.save!(completed: true) }.to change { record.started? }.from(false).to(true)
......
# frozen_string_literal: true # frozen_string_literal: true
require 'spec_helper' require 'fast_spec_helper'
RSpec.describe Elastic::MigrationOptions do RSpec.describe Elastic::MigrationOptions do
let(:migration_class) do let(:migration_class) do
...@@ -62,4 +62,34 @@ RSpec.describe Elastic::MigrationOptions do ...@@ -62,4 +62,34 @@ RSpec.describe Elastic::MigrationOptions do
expect(subject).to eq(10000) expect(subject).to eq(10000)
end end
end end
describe '#retry_on_failure?' do
subject { migration_class.new.retry_on_failure? }
it 'returns false when max_attempts is not set' do
expect(subject).to be_falsey
end
it 'returns true when max_attempts is set' do
migration_class.retry_on_failure
expect(subject).to be_truthy
end
end
describe '#max_attempts' do
subject { migration_class.new.max_attempts }
it 'returns default when retry_on_failure is set' do
migration_class.retry_on_failure
expect(subject).to eq(described_class::DEFAULT_MAX_ATTEMPTS)
end
it 'returns max_attempts when it is set' do
migration_class.retry_on_failure max_attempts: 1_000
expect(subject).to eq(1_000)
end
end
end end
...@@ -66,6 +66,30 @@ RSpec.describe Elastic::MigrationWorker, :elastic do ...@@ -66,6 +66,30 @@ RSpec.describe Elastic::MigrationWorker, :elastic do
end end
end end
context 'executing migration with retry_on_failure set' do
before do
allow(migration).to receive(:started?).and_return(true)
allow(migration).to receive(:retry_on_failure?).and_return(true)
allow(migration).to receive(:max_attempts).and_return(2)
allow(migration).to receive(:migrate).and_raise(StandardError)
end
it 'increases previous_attempts on failure' do
subject.perform
expect(migration.migration_state).to match(previous_attempts: 1)
end
it 'fails the migration if max_attempts is exceeded' do
migration.set_migration_state(previous_attempts: 2)
subject.perform
expect(migration.halted?).to be_truthy
expect(migration.failed?).to be_truthy
end
end
context 'migration process' do context 'migration process' do
before do before do
allow(migration).to receive(:started?).and_return(started) allow(migration).to receive(:started?).and_return(started)
...@@ -151,7 +175,7 @@ RSpec.describe Elastic::MigrationWorker, :elastic do ...@@ -151,7 +175,7 @@ RSpec.describe Elastic::MigrationWorker, :elastic do
it 'halts the migration if there is not enough space' do it 'halts the migration if there is not enough space' do
allow(helper).to receive(:cluster_free_size_bytes).and_return(5) allow(helper).to receive(:cluster_free_size_bytes).and_return(5)
expect(migration).to receive(:halt!) expect(migration).to receive(:halt)
expect(migration).not_to receive(:migrate) expect(migration).not_to receive(:migrate)
subject.perform subject.perform
...@@ -159,7 +183,7 @@ RSpec.describe Elastic::MigrationWorker, :elastic do ...@@ -159,7 +183,7 @@ RSpec.describe Elastic::MigrationWorker, :elastic do
it 'runs the migration if there is enough space' do it 'runs the migration if there is enough space' do
allow(helper).to receive(:cluster_free_size_bytes).and_return(20) allow(helper).to receive(:cluster_free_size_bytes).and_return(20)
expect(migration).not_to receive(:halt!) expect(migration).not_to receive(:fail)
expect(migration).to receive(:migrate).once expect(migration).to receive(:migrate).once
subject.perform subject.perform
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment