Commit e7531abb authored by Terri Chu's avatar Terri Chu Committed by Dmitry Gruzd

Change notes removal migration to use ES tasks

parent e397e0d5
---
title: Change notes removal migration to use ES tasks
merge_request: 61798
author:
type: changed
......@@ -16,25 +16,59 @@ class DeleteNotesFromOriginalIndex < Elastic::Migration
def migrate
retry_attempt = migration_state[:retry_attempt].to_i
task_id = migration_state[:task_id]
if retry_attempt >= MAX_ATTEMPTS
fail_migration_halt_error!(retry_attempt: retry_attempt)
return
end
if task_id
response = helper.task_status(task_id: task_id)
if response['completed']
log "Removing notes from the original index is completed for task_id:#{task_id}"
set_migration_state(
retry_attempt: retry_attempt,
task_id: nil
)
# since delete_by_query is using wait_for_completion = false, the task must be cleaned up
# in Elasticsearch system .tasks index
helper.client.delete(index: '.tasks', type: 'task', id: task_id)
else
log "Removing notes from the original index is still in progress for task_id:#{task_id}"
end
log_raise "Failed to delete notes: #{response['failures']}" if response['failures'].present?
return
end
if completed?
log "Skipping removing notes from the original index since it is already applied"
return
end
response = client.delete_by_query(index: helper.target_name, body: QUERY_BODY)
log "Launching delete by query"
response = client.delete_by_query(index: helper.target_name, body: QUERY_BODY, conflicts: 'proceed', wait_for_completion: false)
log_raise "Failed to delete notes: #{response['failures']}" if response['failures'].present?
log_raise "Failed to delete notes with task_id:#{task_id} - #{response['failures']}" if response['failures'].present?
task_id = response['task']
log "Removing notes from the original index is started with task_id:#{task_id}"
set_migration_state(
retry_attempt: retry_attempt,
task_id: task_id
)
rescue StandardError => e
log "migrate failed, increasing migration_state retry_attempt: #{retry_attempt} error:#{e.class}:#{e.message}"
set_migration_state(
retry_attempt: retry_attempt + 1
retry_attempt: retry_attempt + 1,
task_id: nil
)
raise e
......@@ -43,8 +77,8 @@ class DeleteNotesFromOriginalIndex < Elastic::Migration
def completed?
helper.refresh_index
results = client.search(index: helper.target_name, body: QUERY_BODY.merge(size: 0))
total_remaining = results.dig('hits', 'total', 'value')
results = client.count(index: helper.target_name, body: QUERY_BODY)
total_remaining = results.dig('count')
log "Checking to see if migration is completed based on index counts remaining:#{total_remaining}"
total_remaining == 0
......
......@@ -39,7 +39,23 @@ RSpec.describe DeleteNotesFromOriginalIndex, :elastic, :sidekiq_inline do
end
it 'removes notes from the index' do
expect { migration.migrate }.to change { migration.completed? }.from(false).to(true)
# initiate the task in Elasticsearch
migration.migrate
expect(migration.migration_state).to match(retry_attempt: 0, task_id: anything)
task_id = migration.migration_state[:task_id]
# the migration might not complete after the initial task is created
# so make sure it actually completes
50.times do |_| # Max 0.5s waiting
migration.migrate
break if migration.completed?
sleep 0.01
end
# verify clean up of the task from Elasticsearch
expect(migration.migration_state).to match(retry_attempt: 0, task_id: nil)
expect { helper.client.get(index: '.tasks', type: 'task', id: task_id) }.to raise_error(Elasticsearch::Transport::Transport::Errors::NotFound)
end
end
......@@ -60,7 +76,7 @@ RSpec.describe DeleteNotesFromOriginalIndex, :elastic, :sidekiq_inline do
migration.set_migration_state(retry_attempt: 1)
expect { migration.migrate }.to raise_error(StandardError)
expect(migration.migration_state).to match(retry_attempt: 2)
expect(migration.migration_state).to match(retry_attempt: 2, task_id: nil)
end
it 'fails the migration after too many attempts' do
......@@ -73,19 +89,37 @@ RSpec.describe DeleteNotesFromOriginalIndex, :elastic, :sidekiq_inline do
migration.migrate
expect(migration.migration_state).to match(retry_attempt: 2, halted: true, halted_indexing_unpaused: false)
expect(migration.migration_state).to match(retry_attempt: 2, task_id: nil, halted: true, halted_indexing_unpaused: false)
expect(client).not_to receive(:delete_by_query)
end
end
context 'es responds with errors' do
before do
allow(client).to receive(:delete_by_query).and_return('task' => 'task_1')
end
context 'when a task throws an error' do
before do
allow(helper).to receive(:task_status).and_return('failures' => ['failed'])
migration.migrate
end
it 'raises an error and increases retry attempt' do
expect { migration.migrate }.to raise_error(/Failed to delete notes/)
expect(migration.migration_state).to match(retry_attempt: 1, task_id: nil)
end
end
context 'when delete_by_query throws an error' do
before do
allow(client).to receive(:delete_by_query).and_return('failures' => ['failed'])
end
it 'raises an error and increases retry attempt' do
expect { migration.migrate }.to raise_error(/Failed to delete notes/)
expect(migration.migration_state).to match(retry_attempt: 1)
expect(migration.migration_state).to match(retry_attempt: 1, task_id: nil)
end
end
end
end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment