Commit 6713248f authored by Steve Abrams's avatar Steve Abrams

Merge branch...

Merge branch '294210-gitlab-s-automatic-elasticsearch-reindexing-should-support-retries-for-manual-slicing' into 'master'

Advanced Search reindexing add support for retries

See merge request gitlab-org/gitlab!55681
parents eb16b8b2 31e29f02
---
title: Add support for retries to Advanced Search reindexing
merge_request: 55681
author:
type: changed
# frozen_string_literal: true
class RemoveElasticTaskNullConstraintFromElasticReindexingSubtasks < ActiveRecord::Migration[6.0]
include Gitlab::Database::MigrationHelpers
ELASTIC_TASK = 'elastic_task'
disable_ddl_transaction!
def up
remove_not_null_constraint :elastic_reindexing_subtasks, :elastic_task
change_column_null(:elastic_reindexing_subtasks, :elastic_task, true)
end
def down
# there may be elastic_task values which are null so we fill them with a dummy value
change_column_null(:elastic_reindexing_subtasks, :elastic_task, false, ELASTIC_TASK)
add_not_null_constraint :elastic_reindexing_subtasks, :elastic_task, validate: false
end
end
# frozen_string_literal: true
class CreateElasticReindexingSlices < ActiveRecord::Migration[6.0]
include Gitlab::Database::MigrationHelpers
disable_ddl_transaction!
class ReindexingSubtask < ActiveRecord::Base
self.table_name = 'elastic_reindexing_subtasks'
end
class ReindexingSlice < ActiveRecord::Base
self.table_name = 'elastic_reindexing_slices'
end
def up
unless table_exists?(:elastic_reindexing_slices)
create_table_with_constraints :elastic_reindexing_slices do |t|
t.timestamps_with_timezone null: false
t.references :elastic_reindexing_subtask, foreign_key: { on_delete: :cascade }, null: false, index: { name: 'idx_elastic_reindexing_slices_on_elastic_reindexing_subtask_id' }
t.integer :elastic_slice, null: false, limit: 2, default: 0
t.integer :elastic_max_slice, null: false, limit: 2, default: 0
t.integer :retry_attempt, null: false, limit: 2, default: 0
t.text :elastic_task
t.text_limit :elastic_task, 255
end
end
ReindexingSubtask.find_each do |subtask|
next if ReindexingSlice.where(elastic_reindexing_subtask_id: subtask.id).exists?
ReindexingSlice.create(
elastic_reindexing_subtask_id: subtask.id,
elastic_task: subtask.elastic_task,
retry_attempt: 0
)
end
end
def down
drop_table :elastic_reindexing_slices
end
end
943e415d3cc1090286aece96c013e54e4c07a7c16d8a7be82b560b1a3e36d513
\ No newline at end of file
d72c4cbd4d34fcfb3aae5aa11c583509b41499aa8bd107957934ab57d1756544
\ No newline at end of file
...@@ -12385,13 +12385,34 @@ CREATE SEQUENCE elastic_index_settings_id_seq ...@@ -12385,13 +12385,34 @@ CREATE SEQUENCE elastic_index_settings_id_seq
ALTER SEQUENCE elastic_index_settings_id_seq OWNED BY elastic_index_settings.id; ALTER SEQUENCE elastic_index_settings_id_seq OWNED BY elastic_index_settings.id;
CREATE TABLE elastic_reindexing_slices (
id bigint NOT NULL,
created_at timestamp with time zone NOT NULL,
updated_at timestamp with time zone NOT NULL,
elastic_reindexing_subtask_id bigint NOT NULL,
elastic_slice smallint DEFAULT 0 NOT NULL,
elastic_max_slice smallint DEFAULT 0 NOT NULL,
retry_attempt smallint DEFAULT 0 NOT NULL,
elastic_task text,
CONSTRAINT check_ca30e1396e CHECK ((char_length(elastic_task) <= 255))
);
CREATE SEQUENCE elastic_reindexing_slices_id_seq
START WITH 1
INCREMENT BY 1
NO MINVALUE
NO MAXVALUE
CACHE 1;
ALTER SEQUENCE elastic_reindexing_slices_id_seq OWNED BY elastic_reindexing_slices.id;
CREATE TABLE elastic_reindexing_subtasks ( CREATE TABLE elastic_reindexing_subtasks (
id bigint NOT NULL, id bigint NOT NULL,
elastic_reindexing_task_id bigint NOT NULL, elastic_reindexing_task_id bigint NOT NULL,
alias_name text NOT NULL, alias_name text NOT NULL,
index_name_from text NOT NULL, index_name_from text NOT NULL,
index_name_to text NOT NULL, index_name_to text NOT NULL,
elastic_task text NOT NULL, elastic_task text,
documents_count_target integer, documents_count_target integer,
documents_count integer, documents_count integer,
created_at timestamp with time zone NOT NULL, created_at timestamp with time zone NOT NULL,
...@@ -19505,6 +19526,8 @@ ALTER TABLE ONLY draft_notes ALTER COLUMN id SET DEFAULT nextval('draft_notes_id ...@@ -19505,6 +19526,8 @@ ALTER TABLE ONLY draft_notes ALTER COLUMN id SET DEFAULT nextval('draft_notes_id
ALTER TABLE ONLY elastic_index_settings ALTER COLUMN id SET DEFAULT nextval('elastic_index_settings_id_seq'::regclass); ALTER TABLE ONLY elastic_index_settings ALTER COLUMN id SET DEFAULT nextval('elastic_index_settings_id_seq'::regclass);
ALTER TABLE ONLY elastic_reindexing_slices ALTER COLUMN id SET DEFAULT nextval('elastic_reindexing_slices_id_seq'::regclass);
ALTER TABLE ONLY elastic_reindexing_subtasks ALTER COLUMN id SET DEFAULT nextval('elastic_reindexing_subtasks_id_seq'::regclass); ALTER TABLE ONLY elastic_reindexing_subtasks ALTER COLUMN id SET DEFAULT nextval('elastic_reindexing_subtasks_id_seq'::regclass);
ALTER TABLE ONLY elastic_reindexing_tasks ALTER COLUMN id SET DEFAULT nextval('elastic_reindexing_tasks_id_seq'::regclass); ALTER TABLE ONLY elastic_reindexing_tasks ALTER COLUMN id SET DEFAULT nextval('elastic_reindexing_tasks_id_seq'::regclass);
...@@ -20756,6 +20779,9 @@ ALTER TABLE ONLY draft_notes ...@@ -20756,6 +20779,9 @@ ALTER TABLE ONLY draft_notes
ALTER TABLE ONLY elastic_index_settings ALTER TABLE ONLY elastic_index_settings
ADD CONSTRAINT elastic_index_settings_pkey PRIMARY KEY (id); ADD CONSTRAINT elastic_index_settings_pkey PRIMARY KEY (id);
ALTER TABLE ONLY elastic_reindexing_slices
ADD CONSTRAINT elastic_reindexing_slices_pkey PRIMARY KEY (id);
ALTER TABLE ONLY elastic_reindexing_subtasks ALTER TABLE ONLY elastic_reindexing_subtasks
ADD CONSTRAINT elastic_reindexing_subtasks_pkey PRIMARY KEY (id); ADD CONSTRAINT elastic_reindexing_subtasks_pkey PRIMARY KEY (id);
...@@ -21930,6 +21956,8 @@ CREATE INDEX idx_deployment_clusters_on_cluster_id_and_kubernetes_namespace ON d ...@@ -21930,6 +21956,8 @@ CREATE INDEX idx_deployment_clusters_on_cluster_id_and_kubernetes_namespace ON d
CREATE INDEX idx_eaprpb_external_approval_rule_id ON external_approval_rules_protected_branches USING btree (external_approval_rule_id); CREATE INDEX idx_eaprpb_external_approval_rule_id ON external_approval_rules_protected_branches USING btree (external_approval_rule_id);
CREATE INDEX idx_elastic_reindexing_slices_on_elastic_reindexing_subtask_id ON elastic_reindexing_slices USING btree (elastic_reindexing_subtask_id);
CREATE UNIQUE INDEX idx_environment_merge_requests_unique_index ON deployment_merge_requests USING btree (environment_id, merge_request_id); CREATE UNIQUE INDEX idx_environment_merge_requests_unique_index ON deployment_merge_requests USING btree (environment_id, merge_request_id);
CREATE INDEX idx_geo_con_rep_updated_events_on_container_repository_id ON geo_container_repository_updated_events USING btree (container_repository_id); CREATE INDEX idx_geo_con_rep_updated_events_on_container_repository_id ON geo_container_repository_updated_events USING btree (container_repository_id);
...@@ -26589,6 +26617,9 @@ ALTER TABLE ONLY resource_milestone_events ...@@ -26589,6 +26617,9 @@ ALTER TABLE ONLY resource_milestone_events
ALTER TABLE ONLY namespace_root_storage_statistics ALTER TABLE ONLY namespace_root_storage_statistics
ADD CONSTRAINT fk_rails_a0702c430b FOREIGN KEY (namespace_id) REFERENCES namespaces(id) ON DELETE CASCADE; ADD CONSTRAINT fk_rails_a0702c430b FOREIGN KEY (namespace_id) REFERENCES namespaces(id) ON DELETE CASCADE;
ALTER TABLE ONLY elastic_reindexing_slices
ADD CONSTRAINT fk_rails_a17d86aeb9 FOREIGN KEY (elastic_reindexing_subtask_id) REFERENCES elastic_reindexing_subtasks(id) ON DELETE CASCADE;
ALTER TABLE ONLY project_aliases ALTER TABLE ONLY project_aliases
ADD CONSTRAINT fk_rails_a1804f74a7 FOREIGN KEY (project_id) REFERENCES projects(id) ON DELETE CASCADE; ADD CONSTRAINT fk_rails_a1804f74a7 FOREIGN KEY (project_id) REFERENCES projects(id) ON DELETE CASCADE;
# frozen_string_literal: true
class Elastic::ReindexingSlice < ApplicationRecord
self.table_name = 'elastic_reindexing_slices'
belongs_to :elastic_reindexing_subtask, class_name: 'Elastic::ReindexingSubtask'
validates :elastic_slice, :elastic_max_slice, :retry_attempt, presence: true, numericality: { greater_than_or_equal_to: 0 }
scope :started, -> { where.not(elastic_task: nil).order(elastic_slice: :asc) }
scope :not_started, -> { where(elastic_task: nil).order(elastic_slice: :asc) }
end
...@@ -5,5 +5,7 @@ class Elastic::ReindexingSubtask < ApplicationRecord ...@@ -5,5 +5,7 @@ class Elastic::ReindexingSubtask < ApplicationRecord
belongs_to :elastic_reindexing_task, class_name: 'Elastic::ReindexingTask' belongs_to :elastic_reindexing_task, class_name: 'Elastic::ReindexingTask'
validates :index_name_from, :index_name_to, :elastic_task, presence: true has_many :slices, class_name: 'Elastic::ReindexingSlice', foreign_key: :elastic_reindexing_subtask_id
validates :index_name_from, :index_name_to, presence: true
end end
...@@ -12,6 +12,10 @@ module Elastic ...@@ -12,6 +12,10 @@ module Elastic
DELETE_ORIGINAL_INDEX_AFTER = 14.days DELETE_ORIGINAL_INDEX_AFTER = 14.days
REINDEX_MAX_RETRY_LIMIT = 10
REINDEX_MAX_TOTAL_SLICES_RUNNING = 60
REINDEX_SLICE_MULTIPLIER = 2
def execute def execute
case current_task.state.to_sym case current_task.state.to_sym
when :initial when :initial
...@@ -78,18 +82,24 @@ module Elastic ...@@ -78,18 +82,24 @@ module Elastic
old_index_name = elastic_helper.target_index_name(target: alias_name) old_index_name = elastic_helper.target_index_name(target: alias_name)
# Record documents count # Record documents count
documents_count = elastic_helper.documents_count(index_name: old_index_name) documents_count = elastic_helper.documents_count(index_name: old_index_name)
# Trigger reindex # Create all subtasks
task_id = elastic_helper.reindex(from: old_index_name, to: new_index_name) subtask = current_task.subtasks.create!(
current_task.subtasks.create!(
alias_name: alias_name, alias_name: alias_name,
index_name_from: old_index_name, index_name_from: old_index_name,
index_name_to: new_index_name, index_name_to: new_index_name,
documents_count: documents_count, documents_count: documents_count
elastic_task: task_id
) )
max_slice = elastic_helper.get_settings(index_name: old_index_name).dig('number_of_shards').to_i * REINDEX_SLICE_MULTIPLIER
0.upto(max_slice - 1).to_a.each do |slice|
subtask.slices.create!(
elastic_max_slice: max_slice,
elastic_slice: slice
)
end
end end
trigger_reindexing_slices
current_task.update!(state: :reindexing) current_task.update!(state: :reindexing)
true true
...@@ -104,27 +114,79 @@ module Elastic ...@@ -104,27 +114,79 @@ module Elastic
end end
end end
def check_task_status def check_subtasks_and_reindex_slices
save_documents_count!(refresh: false) save_documents_count!(refresh: false)
current_task.subtasks.each do |subtask| slices_failed = 0
task_status = elastic_helper.task_status(task_id: subtask.elastic_task) slices_in_progress = 0
return false unless task_status['completed'] totals_do_not_match = 0
reindexing_error = task_status.dig('error', 'type') current_task.subtasks.each do |subtask|
if reindexing_error subtask.slices.started.each do |slice|
abort_reindexing!("Task #{subtask.elastic_task} has failed with Elasticsearch error.", additional_logs: { elasticsearch_error_type: reindexing_error }) # Get task status
return false task_status = elastic_helper.task_status(task_id: slice.elastic_task)
# Check if task is complete
slices_in_progress += 1 unless task_status['completed']
# Check for reindexing error
reindexing_error = task_status.dig('error', 'type')
if reindexing_error
slices_failed += 1
message = "Task #{slice.elastic_task} has failed with an Elasticsearch error: #{reindexing_error}."
retry_or_abort_after_limit(subtask, slice, message, { elasticsearch_error_type: reindexing_error, elastic_slice: slice.elastic_slice })
slices_in_progress += 1
next
end
# Check totals match if task complete
response = task_status['response']
if task_status['completed'] && response['total'] != (response['created'] + response['updated'] + response['deleted'])
message = "Task #{slice.elastic_task} total: #{response['total']} is not equal to updated: #{response['updated']} + created: #{response['created']} + deleted: #{response['deleted']}."
retry_or_abort_after_limit(subtask, slice, message, { elastic_slice: slice.elastic_slice })
slices_in_progress += 1
totals_do_not_match += 1
end
end end
end end
true # Kick off more reindexing slices
slices_in_progress = trigger_reindexing_slices(slices_in_progress)
slices_in_progress == 0 && slices_failed == 0 && totals_do_not_match == 0
rescue Elasticsearch::Transport::Transport::Error rescue Elasticsearch::Transport::Transport::Error
abort_reindexing!("Couldn't load task status") abort_reindexing!("Couldn't load task status")
false false
end end
def retry_or_abort_after_limit(subtask, slice, message, additional_logs)
if slice.retry_attempt < REINDEX_MAX_RETRY_LIMIT
retry_slice(subtask, slice, "#{message} Retrying." )
else
abort_reindexing!("#{message}. Retry limit reached. Aborting reindexing.", additional_logs: additional_logs)
end
end
def retry_slice(subtask, slice, message, additional_options = {})
warn = {
message: message,
gitlab_task_id: current_task.id,
gitlab_task_state: current_task.state,
gitlab_subtask_id: subtask.id,
gitlab_subtask_elastic_slice: slice.elastic_slice,
gitlab_subtask_elastic_task: slice.elastic_task
}.merge(additional_options)
logger.warn(warn)
task_id = elastic_helper.reindex(from: subtask.index_name_from, to: subtask.index_name_to, max_slice: slice.elastic_max_slice, slice: slice.elastic_slice)
retry_attempt = slice.retry_attempt + 1
logger.info(message: "Retrying (attempt #{retry_attempt}) reindex task #{task_id} from #{subtask.index_name_from} to #{subtask.index_name_to} started for slice #{slice.elastic_slice}.")
slice.update!(elastic_task: task_id, retry_attempt: retry_attempt)
end
def compare_documents_count def compare_documents_count
save_documents_count!(refresh: true) save_documents_count!(refresh: true)
...@@ -133,6 +195,7 @@ module Elastic ...@@ -133,6 +195,7 @@ module Elastic
new_documents_count = subtask.documents_count_target new_documents_count = subtask.documents_count_target
if old_documents_count != new_documents_count if old_documents_count != new_documents_count
abort_reindexing!("Documents count is different, Count from new index: #{new_documents_count} Count from original index: #{old_documents_count}. This likely means something went wrong during reindexing.") abort_reindexing!("Documents count is different, Count from new index: #{new_documents_count} Count from original index: #{old_documents_count}. This likely means something went wrong during reindexing.")
return false return false
end end
end end
...@@ -140,6 +203,23 @@ module Elastic ...@@ -140,6 +203,23 @@ module Elastic
true true
end end
def trigger_reindexing_slices(slices_in_progress = 0)
current_task.subtasks.each do |subtask|
slices_to_start = REINDEX_MAX_TOTAL_SLICES_RUNNING - slices_in_progress
break if slices_to_start == 0
subtask.slices.not_started.limit(slices_to_start).each do |slice|
task_id = elastic_helper.reindex(from: subtask.index_name_from, to: subtask.index_name_to, max_slice: slice.elastic_max_slice, slice: slice.elastic_slice)
logger.info(message: "Reindex task #{task_id} from #{subtask.index_name_from} to #{subtask.index_name_to} started for slice #{slice.elastic_slice}.")
slice.update!(elastic_task: task_id)
slices_in_progress += 1
end
end
slices_in_progress
end
def apply_default_index_options def apply_default_index_options
current_task.subtasks.each do |subtask| current_task.subtasks.each do |subtask|
elastic_helper.update_settings( elastic_helper.update_settings(
...@@ -162,7 +242,7 @@ module Elastic ...@@ -162,7 +242,7 @@ module Elastic
end end
def reindexing! def reindexing!
return false unless check_task_status return false unless check_subtasks_and_reindex_slices
return false unless compare_documents_count return false unless compare_documents_count
apply_default_index_options apply_default_index_options
......
# frozen_string_literal: true
module Elastic
class LegacyReindexingService
include Gitlab::Utils::StrongMemoize
INITIAL_INDEX_OPTIONS = { # Optimized for writes
refresh_interval: '10s',
number_of_replicas: 0,
translog: { durability: 'async' }
}.freeze
DELETE_ORIGINAL_INDEX_AFTER = 14.days
def execute
case current_task.state.to_sym
when :initial
initial!
when :indexing_paused
indexing_paused!
when :reindexing
reindexing!
end
end
def current_task
strong_memoize(:elastic_current_task) do
Elastic::ReindexingTask.current
end
end
private
def alias_names
[elastic_helper.target_name] + elastic_helper.standalone_indices_proxies.map(&:index_name)
end
def default_index_options(alias_name:, index_name:)
{
refresh_interval: elastic_helper.get_settings(index_name: index_name).dig('refresh_interval'), # Use existing setting or nil for default
number_of_replicas: Elastic::IndexSetting[alias_name].number_of_replicas,
translog: { durability: 'request' }
}
end
def initial!
if Elastic::DataMigrationService.pending_migrations?
# migrations may have paused indexing so we do not want to unpause when aborting the reindexing process
abort_reindexing!('You have unapplied advanced search migrations. Please wait until it is finished', unpause_indexing: false)
return false
end
# Pause indexing
Gitlab::CurrentSettings.update!(elasticsearch_pause_indexing: true)
unless elastic_helper.alias_exists?
abort_reindexing!('Your Elasticsearch index must first use aliases before you can use this feature. Please recreate your index from scratch before reindexing.')
return false
end
expected_free_size = alias_names.sum {|name| elastic_helper.index_size_bytes(index_name: name) } * 2
if elastic_helper.cluster_free_size_bytes < expected_free_size
abort_reindexing!("You should have at least #{expected_free_size} bytes of storage available to perform reindexing. Please increase the storage in your Elasticsearch cluster before reindexing.")
return false
end
current_task.update!(state: :indexing_paused)
true
end
def indexing_paused!
# Create indices with custom settings
main_index = elastic_helper.create_empty_index(with_alias: false, options: { settings: INITIAL_INDEX_OPTIONS })
standalone_indices = elastic_helper.create_standalone_indices(with_alias: false, options: { settings: INITIAL_INDEX_OPTIONS })
main_index.merge(standalone_indices).each do |new_index_name, alias_name|
old_index_name = elastic_helper.target_index_name(target: alias_name)
# Record documents count
documents_count = elastic_helper.documents_count(index_name: old_index_name)
# Trigger reindex
task_id = elastic_helper.reindex(from: old_index_name, to: new_index_name)
current_task.subtasks.create!(
alias_name: alias_name,
index_name_from: old_index_name,
index_name_to: new_index_name,
documents_count: documents_count,
elastic_task: task_id
)
end
current_task.update!(state: :reindexing)
true
end
def save_documents_count!(refresh:)
current_task.subtasks.each do |subtask|
elastic_helper.refresh_index(index_name: subtask.index_name_to) if refresh
new_documents_count = elastic_helper.documents_count(index_name: subtask.index_name_to)
subtask.update!(documents_count_target: new_documents_count)
end
end
def check_task_status
save_documents_count!(refresh: false)
current_task.subtasks.each do |subtask|
task_status = elastic_helper.task_status(task_id: subtask.elastic_task)
return false unless task_status['completed']
reindexing_error = task_status.dig('error', 'type')
if reindexing_error
abort_reindexing!("Task #{subtask.elastic_task} has failed with Elasticsearch error.", additional_logs: { elasticsearch_error_type: reindexing_error })
return false
end
end
true
rescue Elasticsearch::Transport::Transport::Error
abort_reindexing!("Couldn't load task status")
false
end
def compare_documents_count
save_documents_count!(refresh: true)
current_task.subtasks.each do |subtask|
old_documents_count = subtask.documents_count
new_documents_count = subtask.documents_count_target
if old_documents_count != new_documents_count
abort_reindexing!("Documents count is different, Count from new index: #{new_documents_count} Count from original index: #{old_documents_count}. This likely means something went wrong during reindexing.")
return false
end
end
true
end
def apply_default_index_options
current_task.subtasks.each do |subtask|
elastic_helper.update_settings(
index_name: subtask.index_name_to,
settings: default_index_options(alias_name: subtask.alias_name, index_name: subtask.index_name_from)
)
end
end
def switch_alias_to_new_index
current_task.subtasks.each do |subtask|
elastic_helper.switch_alias(from: subtask.index_name_from, to: subtask.index_name_to, alias_name: subtask.alias_name)
end
end
def finalize_reindexing
Gitlab::CurrentSettings.update!(elasticsearch_pause_indexing: false)
current_task.update!(state: :success, delete_original_index_at: DELETE_ORIGINAL_INDEX_AFTER.from_now)
end
def reindexing!
return false unless check_task_status
return false unless compare_documents_count
apply_default_index_options
switch_alias_to_new_index
finalize_reindexing
true
end
def abort_reindexing!(reason, additional_logs: {}, unpause_indexing: true)
error = { message: 'elasticsearch_reindex_error', error: reason, gitlab_task_id: current_task.id, gitlab_task_state: current_task.state }
logger.error(error.merge(additional_logs))
current_task.update!(
state: :failure,
error_message: reason
)
# Unpause indexing
Gitlab::CurrentSettings.update!(elasticsearch_pause_indexing: false) if unpause_indexing
end
def logger
@logger ||= ::Gitlab::Elasticsearch::Logger.build
end
def elastic_helper
Gitlab::Elastic::Helper.default
end
end
end
...@@ -18,7 +18,12 @@ class ElasticClusterReindexingCronWorker ...@@ -18,7 +18,12 @@ class ElasticClusterReindexingCronWorker
task = Elastic::ReindexingTask.current task = Elastic::ReindexingTask.current
break false unless task break false unless task
service.execute # support currently running reindexing processes during an upgrade
if task.subtasks.where.not(elastic_task: nil).any? # rubocop: disable CodeReuse/ActiveRecord
legacy_service.execute
else
service.execute
end
end end
end end
...@@ -27,4 +32,8 @@ class ElasticClusterReindexingCronWorker ...@@ -27,4 +32,8 @@ class ElasticClusterReindexingCronWorker
def service def service
Elastic::ClusterReindexingService.new Elastic::ClusterReindexingService.new
end end
def legacy_service
Elastic::LegacyReindexingService.new
end
end end
...@@ -199,17 +199,21 @@ module Gitlab ...@@ -199,17 +199,21 @@ module Gitlab
client.cluster.stats['nodes']['fs']['free_in_bytes'] client.cluster.stats['nodes']['fs']['free_in_bytes']
end end
def reindex(from: target_index_name, to:, wait_for_completion: false) def reindex(from: target_index_name, to:, max_slice:, slice:, wait_for_completion: false)
body = { body = {
source: { source: {
index: from index: from,
slice: {
id: slice,
max: max_slice
}
}, },
dest: { dest: {
index: to index: to
} }
} }
response = client.reindex(body: body, slices: 'auto', wait_for_completion: wait_for_completion) response = client.reindex(body: body, wait_for_completion: wait_for_completion)
response['task'] response['task']
end end
......
# frozen_string_literal: true
FactoryBot.define do
factory :elastic_reindexing_slice, class: 'Elastic::ReindexingSlice' do
association :elastic_reindexing_subtask
sequence(:elastic_slice) { |n| n - 1 }
sequence(:elastic_max_slice) { 5 }
end
end
...@@ -5,7 +5,6 @@ FactoryBot.define do ...@@ -5,7 +5,6 @@ FactoryBot.define do
association :elastic_reindexing_task, in_progress: false, state: :success association :elastic_reindexing_task, in_progress: false, state: :success
sequence(:index_name_from) { |n| "old_index_name_#{n}" } sequence(:index_name_from) { |n| "old_index_name_#{n}" }
sequence(:index_name_to) { |n| "new_index_name_#{n}" } sequence(:index_name_to) { |n| "new_index_name_#{n}" }
sequence(:elastic_task) { |n| "elastic_task_#{n}" }
sequence(:alias_name) { |n| "alias_name_#{n}" } sequence(:alias_name) { |n| "alias_name_#{n}" }
end end
end end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Elastic::ReindexingSlice, type: :model do
describe 'relations' do
it { is_expected.to belong_to(:elastic_reindexing_subtask) }
end
describe 'validations' do
it { is_expected.to validate_presence_of(:elastic_slice) }
it { is_expected.to validate_presence_of(:elastic_max_slice) }
it { is_expected.to validate_presence_of(:retry_attempt) }
it { is_expected.to validate_numericality_of(:retry_attempt).is_greater_than_or_equal_to(0) }
it { is_expected.to validate_numericality_of(:elastic_slice).is_greater_than_or_equal_to(0) }
it { is_expected.to validate_numericality_of(:elastic_max_slice).is_greater_than_or_equal_to(0) }
end
end
...@@ -10,6 +10,5 @@ RSpec.describe Elastic::ReindexingSubtask, type: :model do ...@@ -10,6 +10,5 @@ RSpec.describe Elastic::ReindexingSubtask, type: :model do
describe 'validations' do describe 'validations' do
it { is_expected.to validate_presence_of(:index_name_from) } it { is_expected.to validate_presence_of(:index_name_from) }
it { is_expected.to validate_presence_of(:index_name_to) } it { is_expected.to validate_presence_of(:index_name_to) }
it { is_expected.to validate_presence_of(:elastic_task) }
end end
end end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Elastic::LegacyReindexingService, :elastic do
subject { described_class.new }
let(:helper) { Gitlab::Elastic::Helper.new }
before do
allow(Gitlab::Elastic::Helper).to receive(:default).and_return(helper)
end
context 'state: initial' do
let(:task) { create(:elastic_reindexing_task, state: :initial) }
it 'aborts if the main index does not use aliases' do
allow(helper).to receive(:alias_exists?).and_return(false)
expect { subject.execute }.to change { task.reload.state }.from('initial').to('failure')
expect(task.reload.error_message).to match(/use aliases/)
end
it 'aborts if there are pending ES migrations' do
allow(Elastic::DataMigrationService).to receive(:pending_migrations?).and_return(true)
expect { subject.execute }.to change { task.reload.state }.from('initial').to('failure')
expect(task.reload.error_message).to match(/unapplied advanced search migrations/)
end
it 'errors when there is not enough space' do
allow(helper).to receive(:index_size_bytes).and_return(100.megabytes)
allow(helper).to receive(:cluster_free_size_bytes).and_return(30.megabytes)
expect { subject.execute }.to change { task.reload.state }.from('initial').to('failure')
expect(task.reload.error_message).to match(/storage available/)
end
it 'pauses elasticsearch indexing' do
expect(Gitlab::CurrentSettings.elasticsearch_pause_indexing).to eq(false)
expect { subject.execute }.to change { task.reload.state }.from('initial').to('indexing_paused')
expect(Gitlab::CurrentSettings.elasticsearch_pause_indexing).to eq(true)
end
end
context 'state: indexing_paused' do
it 'triggers reindexing' do
task = create(:elastic_reindexing_task, state: :indexing_paused)
allow(helper).to receive(:create_empty_index).and_return('new_index_name' => 'new_index')
allow(helper).to receive(:create_standalone_indices).and_return('new_issues_name' => 'new_issues')
allow(helper).to receive(:reindex).with(from: anything, to: 'new_index_name').and_return('task_id_1')
allow(helper).to receive(:reindex).with(from: anything, to: 'new_issues_name').and_return('task_id_2')
expect { subject.execute }.to change { task.reload.state }.from('indexing_paused').to('reindexing')
subtasks = task.subtasks
expect(subtasks.count).to eq(2)
expect(subtasks.first.index_name_to).to eq('new_index_name')
expect(subtasks.first.elastic_task).to eq('task_id_1')
expect(subtasks.last.index_name_to).to eq('new_issues_name')
expect(subtasks.last.elastic_task).to eq('task_id_2')
end
end
context 'state: reindexing' do
let(:task) { create(:elastic_reindexing_task, state: :reindexing) }
let(:subtask) { create(:elastic_reindexing_subtask, elastic_reindexing_task: task, documents_count: 10)}
let(:refresh_interval) { nil }
let(:expected_default_settings) do
{
refresh_interval: refresh_interval,
number_of_replicas: Elastic::IndexSetting[subtask.alias_name].number_of_replicas,
translog: { durability: 'request' }
}
end
before do
allow(helper).to receive(:task_status).and_return({ 'completed' => true })
allow(helper).to receive(:refresh_index).and_return(true)
end
context 'errors are raised' do
before do
allow(helper).to receive(:documents_count).with(index_name: subtask.index_name_to).and_return(subtask.reload.documents_count * 2)
end
it 'errors if documents count is different' do
expect { subject.execute }.to change { task.reload.state }.from('reindexing').to('failure')
expect(task.reload.error_message).to match(/count is different/)
end
it 'errors if reindexing is failed' do
allow(helper).to receive(:task_status).and_return({ 'completed' => true, 'error' => { 'type' => 'search_phase_execution_exception' } })
expect { subject.execute }.to change { task.reload.state }.from('reindexing').to('failure')
expect(task.reload.error_message).to match(/has failed with/)
end
it 'errors if task is not found' do
allow(helper).to receive(:task_status).and_raise(Elasticsearch::Transport::Transport::Errors::NotFound)
expect { subject.execute }.to change { task.reload.state }.from('reindexing').to('failure')
expect(task.reload.error_message).to match(/couldn't load task status/i)
end
end
context 'task finishes correctly' do
using RSpec::Parameterized::TableSyntax
where(:refresh_interval, :current_settings) do
nil | {}
'60s' | { refresh_interval: '60s' }
end
with_them do
before do
allow(helper).to receive(:documents_count).with(index_name: subtask.index_name_to).and_return(subtask.reload.documents_count)
allow(helper).to receive(:get_settings).with(index_name: subtask.index_name_from).and_return(current_settings.with_indifferent_access)
end
it 'launches all state steps' do
expect(helper).to receive(:update_settings).with(index_name: subtask.index_name_to, settings: expected_default_settings)
expect(helper).to receive(:switch_alias).with(to: subtask.index_name_to, from: subtask.index_name_from, alias_name: subtask.alias_name)
expect(Gitlab::CurrentSettings).to receive(:update!).with(elasticsearch_pause_indexing: false)
expect { subject.execute }.to change { task.reload.state }.from('reindexing').to('success')
expect(task.reload.delete_original_index_at).to be_within(1.minute).of(described_class::DELETE_ORIGINAL_INDEX_AFTER.from_now)
end
end
end
end
end
...@@ -115,11 +115,10 @@ RSpec.describe 'admin/application_settings/_elasticsearch_form' do ...@@ -115,11 +115,10 @@ RSpec.describe 'admin/application_settings/_elasticsearch_form' do
let!(:task) { create(:elastic_reindexing_task, state: :reindexing, error_message: 'error-message') } let!(:task) { create(:elastic_reindexing_task, state: :reindexing, error_message: 'error-message') }
let!(:subtask) { create(:elastic_reindexing_subtask, elastic_reindexing_task: task, documents_count_target: 5, documents_count: 10) } let!(:subtask) { create(:elastic_reindexing_subtask, elastic_reindexing_task: task, documents_count_target: 5, documents_count: 10) }
it 'renders the task' do it 'renders the task information' do
render render
expect(rendered).to include("Reindexing Status: #{task.state}") expect(rendered).to include("Reindexing Status: #{task.state}")
expect(rendered).to include("Task ID: #{subtask.elastic_task}")
expect(rendered).to include("Error: #{task.error_message}") expect(rendered).to include("Error: #{task.error_message}")
expect(rendered).to include("Expected documents: #{subtask.documents_count}") expect(rendered).to include("Expected documents: #{subtask.documents_count}")
expect(rendered).to include("Documents reindexed: #{subtask.documents_count_target} (50.0%)") expect(rendered).to include("Documents reindexed: #{subtask.documents_count_target} (50.0%)")
...@@ -130,11 +129,10 @@ RSpec.describe 'admin/application_settings/_elasticsearch_form' do ...@@ -130,11 +129,10 @@ RSpec.describe 'admin/application_settings/_elasticsearch_form' do
let!(:task) { create(:elastic_reindexing_task, state: :reindexing) } let!(:task) { create(:elastic_reindexing_task, state: :reindexing) }
let!(:subtask) { create(:elastic_reindexing_subtask, elastic_reindexing_task: task, documents_count: 10) } let!(:subtask) { create(:elastic_reindexing_subtask, elastic_reindexing_task: task, documents_count: 10) }
it 'renders the task' do it 'renders the task information' do
render render
expect(rendered).to include("Reindexing Status: #{task.state}") expect(rendered).to include("Reindexing Status: #{task.state}")
expect(rendered).to include("Task ID: #{subtask.elastic_task}")
expect(rendered).to include("Expected documents: #{subtask.documents_count}") expect(rendered).to include("Expected documents: #{subtask.documents_count}")
expect(rendered).not_to include("Error:") expect(rendered).not_to include("Error:")
expect(rendered).not_to include("Documents reindexed:") expect(rendered).not_to include("Documents reindexed:")
......
...@@ -22,5 +22,18 @@ RSpec.describe ElasticClusterReindexingCronWorker do ...@@ -22,5 +22,18 @@ RSpec.describe ElasticClusterReindexingCronWorker do
expect(subject.perform).to eq(false) expect(subject.perform).to eq(false)
end end
# support currently running reindexing processes during an upgrade
it 'calls the legacy service if subtask has elastic_task populated' do
task = create(:elastic_reindexing_task)
create(:elastic_reindexing_subtask, elastic_reindexing_task: task, elastic_task: 'test')
expect(Elastic::ReindexingTask).to receive(:current).and_return(task)
expect_next_instance_of(Elastic::LegacyReindexingService) do |service|
expect(service).to receive(:execute).and_return(false)
end
subject.perform
end
end end
end end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment