Commit aac268e2 authored by Dmitry Gruzd's avatar Dmitry Gruzd Committed by Dylan Griffith

ES cluster reindexing feature

This MR automates steps for the cluster reindexing process
Elastic::ClusterReindexingService is the main class responsible for
the reindexing process
parent 8aeee371
......@@ -521,6 +521,11 @@ production: &base
elastic_index_initial_bulk_cron_worker:
cron: "*/1 * * * *"
# Elasticsearch reindexing worker
# NOTE: This will only take effect if elasticsearch is enabled.
elastic_index_initial_bulk_cron_worker:
cron: "*/10 * * * *"
registry:
# enabled: true
# host: registry.example.com
......
......@@ -567,6 +567,9 @@ Gitlab.ee do
Settings.cron_jobs['elastic_index_initial_bulk_cron_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['elastic_index_initial_bulk_cron_worker']['cron'] ||= '*/1 * * * *'
Settings.cron_jobs['elastic_index_initial_bulk_cron_worker']['job_class'] ||= 'ElasticIndexInitialBulkCronWorker'
Settings.cron_jobs['elastic_cluster_reindexing_cron_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['elastic_cluster_reindexing_cron_worker']['cron'] ||= '*/10 * * * *'
Settings.cron_jobs['elastic_cluster_reindexing_cron_worker']['job_class'] ||= 'ElasticClusterReindexingCronWorker'
Settings.cron_jobs['sync_seat_link_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['sync_seat_link_worker']['cron'] ||= "#{rand(60)} 0 * * *"
Settings.cron_jobs['sync_seat_link_worker']['job_class'] = 'SyncSeatLinkWorker'
......
# frozen_string_literal: true
class CreateElasticReindexingTask < ActiveRecord::Migration[6.0]
include Gitlab::Database::MigrationHelpers
DOWNTIME = false
disable_ddl_transaction!
def up
create_table :elastic_reindexing_tasks do |t|
t.timestamps_with_timezone null: false
t.integer :documents_count
t.integer :state, null: false, default: 0, limit: 2, index: true
t.boolean :in_progress, null: false, default: true
t.text :index_name_from
t.text :index_name_to
t.text :elastic_task
t.text :error_message
end
add_text_limit :elastic_reindexing_tasks, :index_name_from, 255
add_text_limit :elastic_reindexing_tasks, :index_name_to, 255
add_text_limit :elastic_reindexing_tasks, :elastic_task, 255
add_text_limit :elastic_reindexing_tasks, :error_message, 255
add_index :elastic_reindexing_tasks, :in_progress, unique: true, where: 'in_progress'
end
def down
drop_table :elastic_reindexing_tasks
end
end
......@@ -11093,6 +11093,32 @@ CREATE SEQUENCE public.draft_notes_id_seq
ALTER SEQUENCE public.draft_notes_id_seq OWNED BY public.draft_notes.id;
CREATE TABLE public.elastic_reindexing_tasks (
id bigint NOT NULL,
created_at timestamp with time zone NOT NULL,
updated_at timestamp with time zone NOT NULL,
documents_count integer,
state smallint DEFAULT 0 NOT NULL,
in_progress boolean DEFAULT true NOT NULL,
index_name_from text,
index_name_to text,
elastic_task text,
error_message text,
CONSTRAINT check_04151aca42 CHECK ((char_length(index_name_from) <= 255)),
CONSTRAINT check_7f64acda8e CHECK ((char_length(error_message) <= 255)),
CONSTRAINT check_85ebff7124 CHECK ((char_length(index_name_to) <= 255)),
CONSTRAINT check_942e5aae53 CHECK ((char_length(elastic_task) <= 255))
);
CREATE SEQUENCE public.elastic_reindexing_tasks_id_seq
START WITH 1
INCREMENT BY 1
NO MINVALUE
NO MAXVALUE
CACHE 1;
ALTER SEQUENCE public.elastic_reindexing_tasks_id_seq OWNED BY public.elastic_reindexing_tasks.id;
CREATE TABLE public.elasticsearch_indexed_namespaces (
created_at timestamp with time zone NOT NULL,
updated_at timestamp with time zone NOT NULL,
......@@ -16440,6 +16466,8 @@ ALTER TABLE ONLY public.diff_note_positions ALTER COLUMN id SET DEFAULT nextval(
ALTER TABLE ONLY public.draft_notes ALTER COLUMN id SET DEFAULT nextval('public.draft_notes_id_seq'::regclass);
ALTER TABLE ONLY public.elastic_reindexing_tasks ALTER COLUMN id SET DEFAULT nextval('public.elastic_reindexing_tasks_id_seq'::regclass);
ALTER TABLE ONLY public.emails ALTER COLUMN id SET DEFAULT nextval('public.emails_id_seq'::regclass);
ALTER TABLE ONLY public.environments ALTER COLUMN id SET DEFAULT nextval('public.environments_id_seq'::regclass);
......@@ -17413,6 +17441,9 @@ ALTER TABLE ONLY public.diff_note_positions
ALTER TABLE ONLY public.draft_notes
ADD CONSTRAINT draft_notes_pkey PRIMARY KEY (id);
ALTER TABLE ONLY public.elastic_reindexing_tasks
ADD CONSTRAINT elastic_reindexing_tasks_pkey PRIMARY KEY (id);
ALTER TABLE ONLY public.emails
ADD CONSTRAINT emails_pkey PRIMARY KEY (id);
......@@ -18926,6 +18957,10 @@ CREATE INDEX index_draft_notes_on_discussion_id ON public.draft_notes USING btre
CREATE INDEX index_draft_notes_on_merge_request_id ON public.draft_notes USING btree (merge_request_id);
CREATE UNIQUE INDEX index_elastic_reindexing_tasks_on_in_progress ON public.elastic_reindexing_tasks USING btree (in_progress) WHERE in_progress;
CREATE INDEX index_elastic_reindexing_tasks_on_state ON public.elastic_reindexing_tasks USING btree (state);
CREATE INDEX index_elasticsearch_indexed_namespaces_on_created_at ON public.elasticsearch_indexed_namespaces USING btree (created_at);
CREATE UNIQUE INDEX index_elasticsearch_indexed_namespaces_on_namespace_id ON public.elasticsearch_indexed_namespaces USING btree (namespace_id);
......@@ -23488,6 +23523,7 @@ COPY "schema_migrations" (version) FROM STDIN;
20200623000148
20200623000320
20200623121135
20200623141544
20200623170000
20200623185440
20200624075411
......
# frozen_string_literal: true
class Elastic::ReindexingTask < ApplicationRecord
self.table_name = 'elastic_reindexing_tasks'
enum state: {
initial: 0,
indexing_paused: 1,
reindexing: 2,
success: 10, # states less than 10 are considered in_progress
failure: 11
}
before_save :set_in_progress_flag
def self.current
where(in_progress: true).last
end
private
def set_in_progress_flag
in_progress_states = self.class.states.select { |_, v| v < 10 }.keys
self.in_progress = in_progress_states.include?(state)
end
end
# frozen_string_literal: true
module Elastic
class ClusterReindexingService
INITIAL_INDEX_OPTIONS = { # Optimized for writes
refresh_interval: -1, # Disable automatic refreshing
number_of_replicas: 0,
translog: { durability: 'async' }
}.freeze
def execute
case current_task.state.to_sym
when :initial
initial!
when :indexing_paused
indexing_paused!
when :reindexing
reindexing!
end
end
def current_task
Elastic::ReindexingTask.current
end
private
def default_index_options
{
refresh_interval: nil, # Change it back to the default
number_of_replicas: Gitlab::CurrentSettings.elasticsearch_replicas,
translog: { durability: 'request' }
}
end
def initial!
# Pause indexing
Gitlab::CurrentSettings.update!(elasticsearch_pause_indexing: true)
unless elastic_helper.alias_exists?
abort_reindexing!('Your Elasticsearch index must first use aliases before you can use this feature. Please recreate your index from scratch before reindexing.')
return false
end
expected_free_size = elastic_helper.index_size_bytes * 2
if elastic_helper.cluster_free_size_bytes < expected_free_size
abort_reindexing!("You should have at least #{expected_free_size} bytes of storage available to perform reindexing. Please increase the storage in your Elasticsearch cluster before reindexing.")
return false
end
current_task.update!(state: :indexing_paused)
true
end
def indexing_paused!
# Create an index with custom settings
index_name = elastic_helper.create_empty_index(with_alias: false, options: { settings: INITIAL_INDEX_OPTIONS })
# Record documents count
documents_count = elastic_helper.index_size.dig('docs', 'count')
# Trigger reindex
task_id = elastic_helper.reindex(to: index_name)
current_task.update!(
index_name_from: elastic_helper.target_index_name,
index_name_to: index_name,
documents_count: documents_count,
elastic_task: task_id,
state: :reindexing
)
true
end
def reindexing!
task = current_task
# Check if indexing is completed
task_status = elastic_helper.task_status(task_id: task.elastic_task)
return false unless task_status['completed']
# Check if reindexing is failed
reindexing_error = task_status.dig('error', 'type')
if reindexing_error
abort_reindexing!("Task #{task.elastic_task} has failed with Elasticsearch error.", additional_logs: { elasticsearch_error_type: reindexing_error })
return false
end
# Refresh a new index
elastic_helper.refresh_index(index_name: task.index_name_to)
# Compare documents count
old_documents_count = task.documents_count
new_documents_count = elastic_helper.index_size(index_name: task.index_name_to).dig('docs', 'count')
if old_documents_count != new_documents_count
abort_reindexing!("Documents count is different, Count from new index: #{new_documents_count} Count from original index: #{old_documents_count}. This likely means something went wrong during reindexing.")
return false
end
# Change index settings back
elastic_helper.update_settings(index_name: task.index_name_to, settings: default_index_options)
# Switch alias to a new index
elastic_helper.switch_alias(to: task.index_name_to)
# Unpause indexing
Gitlab::CurrentSettings.update!(elasticsearch_pause_indexing: false)
task.update!(state: :success)
true
end
def abort_reindexing!(reason, additional_logs: {})
error = { message: 'elasticsearch_reindex_error', error: reason, elasticsearch_task_id: current_task.elastic_task, gitlab_task_id: current_task.id, gitlab_task_state: current_task.state }
logger.error(error.merge(additional_logs))
current_task.update!(
state: :failure,
error_message: reason
)
# Unpause indexing
Gitlab::CurrentSettings.update!(elasticsearch_pause_indexing: false)
end
def logger
@logger ||= ::Gitlab::Elasticsearch::Logger.build
end
def elastic_helper
Gitlab::Elastic::Helper.default
end
end
end
......@@ -27,6 +27,14 @@
:weight: 1
:idempotent:
:tags: []
- :name: cronjob:elastic_cluster_reindexing_cron
:feature_category: :global_search
:has_external_dependencies:
:urgency: :throttled
:resource_boundary: :unknown
:weight: 1
:idempotent: true
:tags: []
- :name: cronjob:elastic_index_bulk_cron
:feature_category: :global_search
:has_external_dependencies:
......
# frozen_string_literal: true
class ElasticClusterReindexingCronWorker
include ApplicationWorker
include CronjobQueue # rubocop:disable Scalability/CronWorkerContext
include Gitlab::ExclusiveLeaseHelpers
sidekiq_options retry: false
feature_category :global_search
urgency :throttled
idempotent!
def perform
task = Elastic::ReindexingTask.current
return false unless task
in_lock(self.class.name.underscore, ttl: 1.hour, retries: 10, sleep_sec: 1) do
service.execute
end
end
private
def service
Elastic::ClusterReindexingService.new
end
end
---
title: Add cluster reindexing feature to our ES integration
merge_request: 34069
author:
type: added
......@@ -29,11 +29,11 @@ module Gitlab
end
def create_empty_index(with_alias: true, options: {})
if index_exists?
raise "Index under '#{target_name}' already exists, use `recreate_index` to recreate it."
end
new_index_name = options[:index_name] || "#{target_name}-#{Time.now.strftime("%Y%m%d-%H%M")}"
new_index_name = with_alias ? "#{target_name}-#{Time.now.strftime("%Y%m%d-%H%M")}" : target_name
if with_alias ? index_exists? : index_exists?(index_name: new_index_name)
raise "Index under '#{with_alias ? target_name : new_index_name}' already exists, use `recreate_index` to recreate it."
end
settings = {}
mappings = {}
......@@ -75,18 +75,19 @@ module Gitlab
client.indices.create create_index_options
client.indices.put_alias(name: target_name, index: new_index_name) if with_alias
new_index_name
end
def delete_index
result = client.indices.delete(index: target_index_name)
def delete_index(index_name: nil)
result = client.indices.delete(index: index_name || target_index_name)
result['acknowledged']
rescue ::Elasticsearch::Transport::Transport::Errors::NotFound => e
Gitlab::ErrorTracking.log_exception(e)
false
end
def index_exists?
client.indices.exists?(index: target_name) # rubocop:disable CodeReuse/ActiveRecord
def index_exists?(index_name: nil)
client.indices.exists?(index: index_name || target_name) # rubocop:disable CodeReuse/ActiveRecord
end
def alias_exists?
......@@ -96,15 +97,58 @@ module Gitlab
# Calls Elasticsearch refresh API to ensure data is searchable
# immediately.
# https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-refresh.html
def refresh_index
client.indices.refresh(index: target_name)
def refresh_index(index_name: nil)
client.indices.refresh(index: index_name || target_name)
end
def index_size(index_name: nil)
client.indices.stats['indices'][index_name || target_index_name]['total']
end
def index_size_bytes
index_size['store']['size_in_bytes']
end
def index_size
client.indices.stats['indices'][target_index_name]['total']
def cluster_free_size_bytes
client.cluster.stats['nodes']['fs']['free_in_bytes']
end
private
def reindex(from: target_index_name, to:, wait_for_completion: false)
body = {
source: {
index: from
},
dest: {
index: to
}
}
response = client.reindex(body: body, slices: 'auto', wait_for_completion: wait_for_completion)
response['task']
end
def task_status(task_id:)
client.tasks.get(task_id: task_id)
end
def update_settings(index_name: nil, settings:)
client.indices.put_settings(index: index_name || target_index_name, body: settings)
end
def switch_alias(from: target_index_name, to:)
actions = [
{
remove: { index: from, alias: target_name }
},
{
add: { index: to, alias: target_name }
}
]
body = { actions: actions }
client.indices.update_aliases(body: body)
end
# This method is used when we need to get an actual index name (if it's used through an alias)
def target_index_name
......
......@@ -76,6 +76,15 @@ namespace :gitlab do
Rake::Task["gitlab:elastic:create_empty_index"].invoke(*args)
end
desc "GitLab | Elasticsearch | Zero-downtime cluster reindexing"
task reindex_cluster: :environment do
Elastic::ReindexingTask.create!
ElasticClusterReindexingCronWorker.perform_async
puts "Reindexing job was successfully scheduled".color(:green)
end
desc "GitLab | Elasticsearch | Clear indexing status"
task clear_index_status: :environment do
IndexStatus.delete_all
......
# frozen_string_literal: true
FactoryBot.define do
factory :elastic_reindexing_task, class: 'Elastic::ReindexingTask' do
state { :initial }
in_progress { true }
index_name_from { 'old_index_name' }
index_name_to { 'new_index_name' }
end
end
......@@ -7,18 +7,18 @@ RSpec.describe Gitlab::Elastic::Helper do
shared_context 'with a legacy index' do
before do
helper.create_empty_index(with_alias: false)
@index_name = helper.create_empty_index(with_alias: false, options: { index_name: helper.target_name })
end
end
shared_context 'with an existing index and alias' do
before do
helper.create_empty_index(with_alias: true)
@index_name = helper.create_empty_index(with_alias: true)
end
end
after do
helper.delete_index
helper.delete_index(index_name: @index_name)
end
describe '.new' do
......@@ -41,21 +41,34 @@ RSpec.describe Gitlab::Elastic::Helper do
describe '#create_empty_index' do
context 'with an empty cluster' do
it 'creates index and alias' do
helper.create_empty_index
context 'with alias and index' do
include_context 'with an existing index and alias'
it 'creates index and alias' do
expect(helper.index_exists?).to eq(true)
expect(helper.alias_exists?).to eq(true)
end
end
it 'creates the index only' do
helper.create_empty_index(with_alias: false)
context 'when there is a legacy index' do
include_context 'with a legacy index'
it 'creates the index only' do
expect(helper.index_exists?).to eq(true)
expect(helper.alias_exists?).to eq(false)
end
end
it 'creates an index with a custom name' do
@index_name = 'test-custom-index-name'
helper.create_empty_index(with_alias: false, options: { index_name: @index_name })
expect(helper.index_exists?(index_name: @index_name)).to eq(true)
expect(helper.index_exists?).to eq(false)
end
end
context 'when there is an alias' do
include_context 'with an existing index and alias'
......@@ -134,4 +147,25 @@ RSpec.describe Gitlab::Elastic::Helper do
it { is_expected.to be_truthy }
end
end
describe '#cluster_free_size' do
it 'returns valid cluster size' do
expect(helper.cluster_free_size_bytes).to be_positive
end
end
describe '#switch_alias' do
include_context 'with an existing index and alias'
let(:new_index_name) { 'test-switch-alias' }
it 'switches the alias' do
helper.create_empty_index(with_alias: false, options: { index_name: new_index_name })
expect { helper.switch_alias(to: new_index_name) }
.to change { helper.target_index_name }.to(new_index_name)
helper.delete_index(index_name: new_index_name)
end
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Elastic::ReindexingTask, type: :model do
it 'only allows one running task at a time' do
expect { create(:elastic_reindexing_task, state: :success) }.not_to raise_error
expect { create(:elastic_reindexing_task) }.not_to raise_error
expect { create(:elastic_reindexing_task) }.to raise_error(/violates unique constraint/)
end
it 'sets in_progress flag' do
task = create(:elastic_reindexing_task, state: :success)
expect(task.in_progress).to eq(false)
task.update!(state: :reindexing)
expect(task.in_progress).to eq(true)
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Elastic::ClusterReindexingService, :elastic do
subject { described_class.new }
context 'state: initial' do
let(:task) { create(:elastic_reindexing_task, state: :initial) }
it 'errors when there is not enough space' do
allow(Gitlab::Elastic::Helper.default).to receive(:index_size_bytes).and_return(100.megabytes)
allow(Gitlab::Elastic::Helper.default).to receive(:cluster_free_size_bytes).and_return(30.megabytes)
expect { subject.execute }.to change { task.reload.state }.from('initial').to('failure')
expect(task.reload.error_message).to match(/storage available/)
end
it 'pauses elasticsearch indexing' do
expect(Gitlab::CurrentSettings.elasticsearch_pause_indexing).to eq(false)
expect { subject.execute }.to change { task.reload.state }.from('initial').to('indexing_paused')
expect(Gitlab::CurrentSettings.elasticsearch_pause_indexing).to eq(true)
end
end
context 'state: indexing_paused' do
it 'triggers reindexing' do
task = create(:elastic_reindexing_task, state: :indexing_paused)
allow(Gitlab::Elastic::Helper.default).to receive(:create_empty_index).and_return('new_index_name')
allow(Gitlab::Elastic::Helper.default).to receive(:reindex).and_return('task_id')
expect { subject.execute }.to change { task.reload.state }.from('indexing_paused').to('reindexing')
task = task.reload
expect(task.index_name_to).to eq('new_index_name')
expect(task.elastic_task).to eq('task_id')
end
end
context 'state: reindexing' do
let(:task) { create(:elastic_reindexing_task, state: :reindexing, documents_count: 10) }
let(:expected_default_settings) do
{
refresh_interval: nil,
number_of_replicas: Gitlab::CurrentSettings.elasticsearch_replicas,
translog: { durability: 'request' }
}
end
before do
allow(Gitlab::Elastic::Helper.default).to receive(:task_status).and_return({ 'completed' => true })
allow(Gitlab::Elastic::Helper.default).to receive(:refresh_index).and_return(true)
end
it 'errors if documents count is different' do
expect(Gitlab::Elastic::Helper.default).to receive(:index_size).and_return('docs' => { 'count' => task.reload.documents_count * 2 })
expect { subject.execute }.to change { task.reload.state }.from('reindexing').to('failure')
expect(task.reload.error_message).to match(/count is different/)
end
it 'errors if reindexing is failed' do
allow(Gitlab::Elastic::Helper.default).to receive(:task_status).and_return({ 'completed' => true, 'error' => { 'type' => 'search_phase_execution_exception' } })
expect { subject.execute }.to change { task.reload.state }.from('reindexing').to('failure')
expect(task.reload.error_message).to match(/has failed with/)
end
it 'launches all state steps' do
expect(Gitlab::Elastic::Helper.default).to receive(:index_size).and_return('docs' => { 'count' => task.reload.documents_count })
expect(Gitlab::Elastic::Helper.default).to receive(:update_settings).with(index_name: task.index_name_to, settings: expected_default_settings)
expect(Gitlab::Elastic::Helper.default).to receive(:switch_alias).with(to: task.index_name_to)
expect(Gitlab::CurrentSettings).to receive(:update!).with(elasticsearch_pause_indexing: false)
expect { subject.execute }.to change { task.reload.state }.from('reindexing').to('success')
end
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe ElasticClusterReindexingCronWorker do
subject { described_class.new }
describe '#perform' do
it 'calls execute method' do
expect(Elastic::ReindexingTask).to receive(:current).and_return(build(:elastic_reindexing_task))
expect_next_instance_of(Elastic::ClusterReindexingService) do |service|
expect(service).to receive(:execute).and_return(false)
end
subject.perform
end
it 'does nothing if no task is found' do
expect(Elastic::ReindexingTask).to receive(:current).and_return(nil)
expect(subject.perform).to eq(false)
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment