Commit c0a6cdd0 authored by Dmitry Gruzd's avatar Dmitry Gruzd

Advanced Search: Remove legacy ReindexingService

To support reindexing processes in progress when making the change to
support retries, the original service was renamed
LegacyReindexingService and kicked off if the Elastic::ReindexingSubtask
had an elastic_task record filled out. We need to clean up that logic to
remove the legacy service and all related specs.
parent f406ecea
# frozen_string_literal: true
module Elastic
class LegacyReindexingService
include Gitlab::Utils::StrongMemoize
INITIAL_INDEX_OPTIONS = { # Optimized for writes
refresh_interval: '10s',
number_of_replicas: 0,
translog: { durability: 'async' }
}.freeze
DELETE_ORIGINAL_INDEX_AFTER = 14.days
def execute
case current_task.state.to_sym
when :initial
initial!
when :indexing_paused
indexing_paused!
when :reindexing
reindexing!
end
end
def current_task
strong_memoize(:elastic_current_task) do
Elastic::ReindexingTask.current
end
end
private
def alias_names
[elastic_helper.target_name] + elastic_helper.standalone_indices_proxies.map(&:index_name)
end
def default_index_options(alias_name:, index_name:)
{
refresh_interval: elastic_helper.get_settings(index_name: index_name).dig('refresh_interval'), # Use existing setting or nil for default
number_of_replicas: Elastic::IndexSetting[alias_name].number_of_replicas,
translog: { durability: 'request' }
}
end
def initial!
if Elastic::DataMigrationService.pending_migrations?
# migrations may have paused indexing so we do not want to unpause when aborting the reindexing process
abort_reindexing!('You have unapplied advanced search migrations. Please wait until it is finished', unpause_indexing: false)
return false
end
# Pause indexing
Gitlab::CurrentSettings.update!(elasticsearch_pause_indexing: true)
unless elastic_helper.alias_exists?
abort_reindexing!('Your Elasticsearch index must first use aliases before you can use this feature. Please recreate your index from scratch before reindexing.')
return false
end
expected_free_size = alias_names.sum {|name| elastic_helper.index_size_bytes(index_name: name) } * 2
if elastic_helper.cluster_free_size_bytes < expected_free_size
abort_reindexing!("You should have at least #{expected_free_size} bytes of storage available to perform reindexing. Please increase the storage in your Elasticsearch cluster before reindexing.")
return false
end
current_task.update!(state: :indexing_paused)
true
end
def indexing_paused!
# Create indices with custom settings
main_index = elastic_helper.create_empty_index(with_alias: false, options: { settings: INITIAL_INDEX_OPTIONS })
standalone_indices = elastic_helper.create_standalone_indices(with_alias: false, options: { settings: INITIAL_INDEX_OPTIONS })
main_index.merge(standalone_indices).each do |new_index_name, alias_name|
old_index_name = elastic_helper.target_index_name(target: alias_name)
# Record documents count
documents_count = elastic_helper.documents_count(index_name: old_index_name)
# Trigger reindex
task_id = elastic_helper.reindex(from: old_index_name, to: new_index_name)
current_task.subtasks.create!(
alias_name: alias_name,
index_name_from: old_index_name,
index_name_to: new_index_name,
documents_count: documents_count,
elastic_task: task_id
)
end
current_task.update!(state: :reindexing)
true
end
def save_documents_count!(refresh:)
current_task.subtasks.each do |subtask|
elastic_helper.refresh_index(index_name: subtask.index_name_to) if refresh
new_documents_count = elastic_helper.documents_count(index_name: subtask.index_name_to)
subtask.update!(documents_count_target: new_documents_count)
end
end
def check_task_status
save_documents_count!(refresh: false)
current_task.subtasks.each do |subtask|
task_status = elastic_helper.task_status(task_id: subtask.elastic_task)
return false unless task_status['completed']
reindexing_error = task_status.dig('error', 'type')
if reindexing_error
abort_reindexing!("Task #{subtask.elastic_task} has failed with Elasticsearch error.", additional_logs: { elasticsearch_error_type: reindexing_error })
return false
end
end
true
rescue Elasticsearch::Transport::Transport::Error
abort_reindexing!("Couldn't load task status")
false
end
def compare_documents_count
save_documents_count!(refresh: true)
current_task.subtasks.each do |subtask|
old_documents_count = subtask.documents_count
new_documents_count = subtask.documents_count_target
if old_documents_count != new_documents_count
abort_reindexing!("Documents count is different, Count from new index: #{new_documents_count} Count from original index: #{old_documents_count}. This likely means something went wrong during reindexing.")
return false
end
end
true
end
def apply_default_index_options
current_task.subtasks.each do |subtask|
elastic_helper.update_settings(
index_name: subtask.index_name_to,
settings: default_index_options(alias_name: subtask.alias_name, index_name: subtask.index_name_from)
)
end
end
def switch_alias_to_new_index
current_task.subtasks.each do |subtask|
elastic_helper.switch_alias(from: subtask.index_name_from, to: subtask.index_name_to, alias_name: subtask.alias_name)
end
end
def finalize_reindexing
Gitlab::CurrentSettings.update!(elasticsearch_pause_indexing: false)
current_task.update!(state: :success, delete_original_index_at: DELETE_ORIGINAL_INDEX_AFTER.from_now)
end
def reindexing!
return false unless check_task_status
return false unless compare_documents_count
apply_default_index_options
switch_alias_to_new_index
finalize_reindexing
true
end
def abort_reindexing!(reason, additional_logs: {}, unpause_indexing: true)
error = { message: 'elasticsearch_reindex_error', error: reason, gitlab_task_id: current_task.id, gitlab_task_state: current_task.state }
logger.error(error.merge(additional_logs))
current_task.update!(
state: :failure,
error_message: reason
)
# Unpause indexing
Gitlab::CurrentSettings.update!(elasticsearch_pause_indexing: false) if unpause_indexing
end
def logger
@logger ||= ::Gitlab::Elasticsearch::Logger.build
end
def elastic_helper
Gitlab::Elastic::Helper.default
end
end
end
......@@ -18,12 +18,7 @@ class ElasticClusterReindexingCronWorker
task = Elastic::ReindexingTask.current
break false unless task
# support currently running reindexing processes during an upgrade
if task.subtasks.where.not(elastic_task: nil).any? # rubocop: disable CodeReuse/ActiveRecord
legacy_service.execute
else
service.execute
end
service.execute
end
end
......@@ -32,8 +27,4 @@ class ElasticClusterReindexingCronWorker
def service
Elastic::ClusterReindexingService.new
end
def legacy_service
Elastic::LegacyReindexingService.new
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Elastic::LegacyReindexingService, :elastic do
subject { described_class.new }
let(:helper) { Gitlab::Elastic::Helper.new }
before do
allow(Gitlab::Elastic::Helper).to receive(:default).and_return(helper)
end
context 'state: initial' do
let(:task) { create(:elastic_reindexing_task, state: :initial) }
it 'aborts if the main index does not use aliases' do
allow(helper).to receive(:alias_exists?).and_return(false)
expect { subject.execute }.to change { task.reload.state }.from('initial').to('failure')
expect(task.reload.error_message).to match(/use aliases/)
end
it 'aborts if there are pending ES migrations' do
allow(Elastic::DataMigrationService).to receive(:pending_migrations?).and_return(true)
expect { subject.execute }.to change { task.reload.state }.from('initial').to('failure')
expect(task.reload.error_message).to match(/unapplied advanced search migrations/)
end
it 'errors when there is not enough space' do
allow(helper).to receive(:index_size_bytes).and_return(100.megabytes)
allow(helper).to receive(:cluster_free_size_bytes).and_return(30.megabytes)
expect { subject.execute }.to change { task.reload.state }.from('initial').to('failure')
expect(task.reload.error_message).to match(/storage available/)
end
it 'pauses elasticsearch indexing' do
expect(Gitlab::CurrentSettings.elasticsearch_pause_indexing).to eq(false)
expect { subject.execute }.to change { task.reload.state }.from('initial').to('indexing_paused')
expect(Gitlab::CurrentSettings.elasticsearch_pause_indexing).to eq(true)
end
end
context 'state: indexing_paused' do
it 'triggers reindexing' do
task = create(:elastic_reindexing_task, state: :indexing_paused)
allow(helper).to receive(:create_empty_index).and_return('new_index_name' => 'new_index')
allow(helper).to receive(:create_standalone_indices).and_return('new_issues_name' => 'new_issues')
allow(helper).to receive(:reindex).with(from: anything, to: 'new_index_name').and_return('task_id_1')
allow(helper).to receive(:reindex).with(from: anything, to: 'new_issues_name').and_return('task_id_2')
expect { subject.execute }.to change { task.reload.state }.from('indexing_paused').to('reindexing')
subtasks = task.subtasks
expect(subtasks.count).to eq(2)
expect(subtasks.first.index_name_to).to eq('new_index_name')
expect(subtasks.first.elastic_task).to eq('task_id_1')
expect(subtasks.last.index_name_to).to eq('new_issues_name')
expect(subtasks.last.elastic_task).to eq('task_id_2')
end
end
context 'state: reindexing' do
let(:task) { create(:elastic_reindexing_task, state: :reindexing) }
let(:subtask) { create(:elastic_reindexing_subtask, elastic_reindexing_task: task, documents_count: 10)}
let(:refresh_interval) { nil }
let(:expected_default_settings) do
{
refresh_interval: refresh_interval,
number_of_replicas: Elastic::IndexSetting[subtask.alias_name].number_of_replicas,
translog: { durability: 'request' }
}
end
before do
allow(helper).to receive(:task_status).and_return({ 'completed' => true })
allow(helper).to receive(:refresh_index).and_return(true)
end
context 'errors are raised' do
before do
allow(helper).to receive(:documents_count).with(index_name: subtask.index_name_to).and_return(subtask.reload.documents_count * 2)
end
it 'errors if documents count is different' do
expect { subject.execute }.to change { task.reload.state }.from('reindexing').to('failure')
expect(task.reload.error_message).to match(/count is different/)
end
it 'errors if reindexing is failed' do
allow(helper).to receive(:task_status).and_return({ 'completed' => true, 'error' => { 'type' => 'search_phase_execution_exception' } })
expect { subject.execute }.to change { task.reload.state }.from('reindexing').to('failure')
expect(task.reload.error_message).to match(/has failed with/)
end
it 'errors if task is not found' do
allow(helper).to receive(:task_status).and_raise(Elasticsearch::Transport::Transport::Errors::NotFound)
expect { subject.execute }.to change { task.reload.state }.from('reindexing').to('failure')
expect(task.reload.error_message).to match(/couldn't load task status/i)
end
end
context 'task finishes correctly' do
using RSpec::Parameterized::TableSyntax
where(:refresh_interval, :current_settings) do
nil | {}
'60s' | { refresh_interval: '60s' }
end
with_them do
before do
allow(helper).to receive(:documents_count).with(index_name: subtask.index_name_to).and_return(subtask.reload.documents_count)
allow(helper).to receive(:get_settings).with(index_name: subtask.index_name_from).and_return(current_settings.with_indifferent_access)
end
it 'launches all state steps' do
expect(helper).to receive(:update_settings).with(index_name: subtask.index_name_to, settings: expected_default_settings)
expect(helper).to receive(:switch_alias).with(to: subtask.index_name_to, from: subtask.index_name_from, alias_name: subtask.alias_name)
expect(Gitlab::CurrentSettings).to receive(:update!).with(elasticsearch_pause_indexing: false)
expect { subject.execute }.to change { task.reload.state }.from('reindexing').to('success')
expect(task.reload.delete_original_index_at).to be_within(1.minute).of(described_class::DELETE_ORIGINAL_INDEX_AFTER.from_now)
end
end
end
end
end
......@@ -22,18 +22,5 @@ RSpec.describe ElasticClusterReindexingCronWorker do
expect(subject.perform).to eq(false)
end
# support currently running reindexing processes during an upgrade
it 'calls the legacy service if subtask has elastic_task populated' do
task = create(:elastic_reindexing_task)
create(:elastic_reindexing_subtask, elastic_reindexing_task: task, elastic_task: 'test')
expect(Elastic::ReindexingTask).to receive(:current).and_return(task)
expect_next_instance_of(Elastic::LegacyReindexingService) do |service|
expect(service).to receive(:execute).and_return(false)
end
subject.perform
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment