Commit b957737f authored by Dylan Griffith's avatar Dylan Griffith

Merge branch '234046-es-index-migrations' into 'master'

Add background migrations for Elasticsearch

See merge request gitlab-org/gitlab!46672
parents 241ab0c9 1c5b9011
......@@ -605,6 +605,9 @@ Gitlab.ee do
Settings.cron_jobs['elastic_remove_expired_namespace_subscriptions_from_index_cron_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['elastic_remove_expired_namespace_subscriptions_from_index_cron_worker']['cron'] ||= '10 3 * * *'
Settings.cron_jobs['elastic_remove_expired_namespace_subscriptions_from_index_cron_worker']['job_class'] ||= 'ElasticRemoveExpiredNamespaceSubscriptionsFromIndexCronWorker'
Settings.cron_jobs['elastic_migration_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['elastic_migration_worker']['cron'] ||= '*/30 * * * *'
Settings.cron_jobs['elastic_migration_worker']['job_class'] ||= 'Elastic::MigrationWorker'
Settings.cron_jobs['sync_seat_link_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['sync_seat_link_worker']['cron'] ||= "#{rand(60)} 0 * * *"
Settings.cron_jobs['sync_seat_link_worker']['job_class'] = 'SyncSeatLinkWorker'
......
......@@ -184,6 +184,38 @@ If the current version is `v12p1`, and we need to create a new version for `v12p
1. Change the namespace for files under `v12p1` folder from `Latest` to `V12p1`
1. Make changes to files under the `latest` folder as needed
## Creating a new Global Search migration
> This functionality was introduced by [#234046](https://gitlab.com/gitlab-org/gitlab/-/issues/234046).
NOTE: **Note:**
This only supported for indices created with GitLab 13.0 or greater.
Migrations are stored in the [`ee/elastic/migrate/`](https://gitlab.com/gitlab-org/gitlab/-/tree/master/ee/elastic/migrate) folder with `YYYYMMDDHHMMSS_migration_name.rb`
file name format, which is similar to Rails database migrations:
```ruby
# frozen_string_literal: true
class MigrationName < Elastic::Migration
# Important: Any update to the Elastic index mappings should be replicated in Elastic::Latest::Config
def migrate
end
# Check if the migration has completed
# Return true if completed, otherwise return false
def completed?
end
end
```
Applied migrations are stored in `gitlab-#{RAILS_ENV}-migrations` index. All unexecuted migrations
are applied by the [`Elastic::MigrationWorker`](https://gitlab.com/gitlab-org/gitlab/blob/master/ee/app/workers/elastic/migration_worker.rb)
cron worker sequentially.
Any update to the Elastic index mappings should be replicated in [`Elastic::Latest::Config`](https://gitlab.com/gitlab-org/gitlab/-/blob/master/ee/lib/elastic/latest/config.rb).
## Performance Monitoring
### Prometheus
......
# frozen_string_literal: true
module Elastic
class MigrationRecord
attr_reader :version, :name, :filename
delegate :migrate, :skip_migration?, :completed?, to: :migration
def initialize(version:, name:, filename:)
@version = version
@name = name
@filename = filename
@migration = nil
end
def save!(completed:)
raise 'Migrations index is not found' unless helper.index_exists?(index_name: index_name)
client.index index: index_name, type: '_doc', id: version, body: { completed: completed }
end
def persisted?
load_from_index.present?
end
def load_from_index
client.get(index: index_name, id: version)
rescue Elasticsearch::Transport::Transport::Errors::NotFound
nil
end
def self.persisted_versions(completed:)
helper = Gitlab::Elastic::Helper.default
helper.client
.search(index: helper.migrations_index_name, body: { query: { term: { completed: completed } } })
.dig('hits', 'hits')
.map { |v| v['_id'].to_i }
rescue Elasticsearch::Transport::Transport::Errors::NotFound
[]
end
private
def migration
@migration ||= load_migration
end
def load_migration
require(File.expand_path(filename))
name.constantize.new version
end
def index_name
helper.migrations_index_name
end
def client
helper.client
end
def helper
Gitlab::Elastic::Helper.default
end
end
end
# frozen_string_literal: true
module Elastic
class DataMigrationService
MIGRATIONS_PATH = 'ee/elastic/migrate'
MIGRATION_REGEXP = /\A([0-9]+)_([_a-z0-9]*)\.rb\z/.freeze
class << self
def migration_files
Dir[migrations_full_path]
end
def migrations
migrations = migration_files.map do |file|
version, name = parse_migration_filename(file)
Elastic::MigrationRecord.new(version: version.to_i, name: name.camelize, filename: file)
end
migrations.sort_by(&:version)
end
def migration_has_finished?(name)
migration = migrations.find { |migration| migration.name == name.camelize }
!!migration&.load_from_index&.dig('_source', 'completed')
end
def mark_all_as_completed!
migrations.each { |migration| migration.save!(completed: true) }
end
private
def parse_migration_filename(filename)
File.basename(filename).scan(MIGRATION_REGEXP).first
end
def migrations_full_path
Rails.root.join(MIGRATIONS_PATH, '**', '[0-9]*_*.rb').to_s
end
end
end
end
......@@ -59,6 +59,14 @@
:weight: 1
:idempotent: true
:tags: []
- :name: cronjob:elastic_migration
:feature_category: :global_search
:has_external_dependencies:
:urgency: :throttled
:resource_boundary: :unknown
:weight: 1
:idempotent: true
:tags: []
- :name: cronjob:elastic_remove_expired_namespace_subscriptions_from_index_cron
:feature_category: :global_search
:has_external_dependencies:
......
# frozen_string_literal: true
module Elastic
class MigrationWorker
include ApplicationWorker
include Gitlab::ExclusiveLeaseHelpers
# There is no onward scheduling and this cron handles work from across the
# application, so there's no useful context to add.
include CronjobQueue # rubocop:disable Scalability/CronWorkerContext
feature_category :global_search
idempotent!
urgency :throttled
def perform
return false unless Gitlab::CurrentSettings.elasticsearch_indexing?
return false unless helper.alias_exists?
in_lock(self.class.name.underscore, ttl: 1.day, retries: 10, sleep_sec: 1) do
migration = current_migration
unless migration
logger.info 'MigrationWorker: no migration available'
break false
end
unless helper.index_exists?(index_name: helper.migrations_index_name)
logger.info 'MigrationWorker: creating migrations index'
helper.create_migrations_index
end
execute_migration(migration)
completed = migration.completed?
logger.info "MigrationWorker: migration[#{migration.name}] updating with completed: #{completed}"
migration.save!(completed: completed)
end
end
private
def execute_migration(migration)
if migration.persisted?
logger.info "MigrationWorker: migration[#{migration.name}] did not execute migrate method since it was already executed. Waiting for migration to complete"
else
logger.info "MigrationWorker: migration[#{migration.name}] executing migrate method"
migration.migrate
end
end
def current_migration
completed_migrations = Elastic::MigrationRecord.persisted_versions(completed: true)
Elastic::DataMigrationService.migrations.find { |migration| !completed_migrations.include?(migration.version) }
end
def helper
Gitlab::Elastic::Helper.default
end
def logger
@logger ||= ::Gitlab::Elasticsearch::Logger.build
end
end
end
---
title: Add background migrations for Elasticsearch
merge_request: 46672
author:
type: added
# frozen_string_literal: true
class ApplyMaxAnalyzedOffset < Elastic::Migration
# Important: Any update to the Elastic index mappings should be replicated in Elastic::Latest::Config
def migrate
if max_analyzed_offset_setting == current_max_analyzed_offset
log "Skipping highlight.max_analyzed_offset migration since it is already applied"
return
end
log "Setting highlight.max_analyzed_offset to #{max_analyzed_offset_setting}kb"
helper.update_settings(settings: { index: { 'highlight.max_analyzed_offset': max_analyzed_offset_setting } })
log 'Update of highlight.max_analyzed_offset is completed'
end
# Check if the migration has completed
# Return true if completed, otherwise return false
def completed?
max_analyzed_offset_setting == current_max_analyzed_offset
end
private
def max_analyzed_offset_setting
Gitlab::CurrentSettings.elasticsearch_indexed_file_size_limit_kb.kilobytes
end
def current_max_analyzed_offset
Gitlab::Elastic::Helper.default.get_settings.dig('highlight', 'max_analyzed_offset').to_i
end
end
# frozen_string_literal: true
module Elastic
class Migration
attr_reader :version
def initialize(version)
@version = version
end
def migrate
raise NotImplementedError, 'Please extend Elastic::Migration'
end
def completed?
raise NotImplementedError, 'Please extend Elastic::Migration'
end
private
def helper
@helper ||= Gitlab::Elastic::Helper.default
end
def client
helper.client
end
def log(message)
logger.info "[Elastic::Migration: #{self.version}] #{message}"
end
def logger
@logger ||= ::Gitlab::Elasticsearch::Logger.build
end
end
end
......@@ -52,6 +52,33 @@ module Gitlab
mappings.deep_merge(::Elastic::Latest::CustomLanguageAnalyzers.custom_analyzers_mappings)
end
def migrations_index_name
"#{target_name}-migrations"
end
def create_migrations_index
settings = { number_of_shards: 1 }
mappings = {
properties: {
completed: {
type: 'boolean'
}
}
}
create_index_options = {
index: migrations_index_name,
body: {
settings: settings.to_hash,
mappings: mappings.to_hash
}
}
client.indices.create create_index_options
migrations_index_name
end
def create_empty_index(with_alias: true, options: {})
new_index_name = options[:index_name] || "#{target_name}-#{Time.now.strftime("%Y%m%d-%H%M")}"
......@@ -149,6 +176,12 @@ module Gitlab
client.tasks.get(task_id: task_id)
end
def get_settings(index_name: nil)
index = index_name || target_index_name
settings = client.indices.get_settings(index: index)
settings.dig(index, 'settings', 'index')
end
def update_settings(index_name: nil, settings:)
client.indices.put_settings(index: index_name || target_index_name, body: settings)
end
......
......@@ -68,6 +68,9 @@ namespace :gitlab do
helper = Gitlab::Elastic::Helper.new(target_name: args[:target_name])
index_name = helper.create_empty_index(with_alias: with_alias, options: options)
helper.create_migrations_index unless helper.index_exists?(index_name: migrations_index_name)
::Elastic::DataMigrationService.mark_all_as_completed!
puts "Index '#{index_name}' has been created.".color(:green)
puts "Alias '#{helper.target_name}' → '#{index_name}' has been created".color(:green) if with_alias
end
......
......@@ -53,6 +53,18 @@ RSpec.describe Gitlab::Elastic::Helper do
end
end
describe '#create_migrations_index' do
after do
helper.delete_index(index_name: helper.migrations_index_name)
end
it 'creates the index' do
expect { helper.create_migrations_index }
.to change { helper.index_exists?(index_name: helper.migrations_index_name) }
.from(false).to(true)
end
end
describe '#create_empty_index' do
context 'with an empty cluster' do
context 'with alias and index' do
......
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Elastic::Migration, :elastic do
let(:logger) { double('Gitlab::Elasticsearch::Logger') }
let(:migration_class) do
Class.new(described_class) do
def migrate
log "number_of_nodes: #{client.cluster.health['number_of_nodes']}"
raise 'Index does not exist' unless helper.index_exists?(index_name: helper.migrations_index_name)
end
end
end
let(:version) { 20201105181100 }
let(:migration) { migration_class.new(version) }
let(:bare_migration) { described_class.new(version) }
before do
allow(::Gitlab::Elasticsearch::Logger).to receive(:build).and_return(logger)
end
describe '#migrate' do
it 'executes method' do
expect(logger).to receive(:info).with(/number_of_nodes/)
expect { migration.migrate }.not_to raise_error
end
it 'raises exception for original class' do
expect { bare_migration.migrate }.to raise_error(NotImplementedError)
end
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Elastic::MigrationRecord, :elastic do
let(:record) { described_class.new(version: Time.now.to_i, name: 'ExampleMigration', filename: nil) }
describe '#save!' do
it 'creates an index if it is not found' do
es_helper.delete_index(index_name: @migrations_index_name)
expect { record.save!(completed: true) }.to raise_error(/index is not found/)
end
end
describe '#persisted?' do
it 'changes on object save' do
expect { record.save!(completed: true) }.to change { record.persisted? }.from(false).to(true)
end
end
describe '.persisted_versions' do
let(:completed_versions) { 1.upto(5).map { |i| described_class.new(version: i, name: i, filename: nil) } }
let(:in_progress_migration) { described_class.new(version: 10, name: 10, filename: nil) }
before do
completed_versions.each { |migration| migration.save!(completed: true) }
in_progress_migration.save!(completed: false)
es_helper.refresh_index(index_name: @migrations_index_name)
end
it 'loads all records' do
expect(described_class.persisted_versions(completed: true)).to match_array(completed_versions.map(&:version))
expect(described_class.persisted_versions(completed: false)).to contain_exactly(in_progress_migration.version)
end
it 'returns empty array if no index present' do
es_helper.delete_index(index_name: @migrations_index_name)
expect(described_class.persisted_versions(completed: true)).to eq([])
expect(described_class.persisted_versions(completed: false)).to eq([])
end
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Elastic::DataMigrationService, :elastic do
subject { described_class }
describe '.migrations' do
it 'all migration names are unique' do
expect(subject.migrations.count).to eq(subject.migrations.map(&:name).uniq.count)
end
context 'migration_files stubbed' do
let(:migration_files) { %w(ee/elastic/migrate/20201105180000_example_migration.rb ee/elastic/migrate/20201201130000_example_migration.rb) }
before do
allow(subject).to receive(:migration_files).and_return(migration_files)
end
it 'creates migration records' do
migrations = subject.migrations
migration = migrations.first
expect(migrations.count).to eq(2)
expect(migration.version).to eq(20201105180000)
expect(migration.name).to eq('ExampleMigration')
expect(migration.filename).to eq(migration_files.first)
end
end
end
describe '.migration_has_finished?' do
let(:migration) { subject.migrations.first }
let(:migration_name) { migration.name.underscore }
it 'returns true if migration has finished' do
expect(subject.migration_has_finished?(migration_name)).to eq(false)
migration.save!(completed: false)
refresh_index!
expect(subject.migration_has_finished?(migration_name)).to eq(false)
migration.save!(completed: true)
refresh_index!
expect(subject.migration_has_finished?(migration_name)).to eq(true)
end
end
describe 'mark_all_as_completed!' do
it 'creates all migration versions' do
expect(Elastic::MigrationRecord.persisted_versions(completed: true).count).to eq(0)
subject.mark_all_as_completed!
refresh_index!
expect(Elastic::MigrationRecord.persisted_versions(completed: true).count).to eq(subject.migrations.count)
end
end
end
......@@ -5,11 +5,13 @@ RSpec.configure do |config|
Elastic::ProcessBookkeepingService.clear_tracking!
Gitlab::Elastic::Helper.default.delete_index
Gitlab::Elastic::Helper.default.create_empty_index(options: { settings: { number_of_replicas: 0 } })
@migrations_index_name = Gitlab::Elastic::Helper.default.create_migrations_index
end
config.after(:each, :elastic) do
Gitlab::Elastic::Helper.default.delete_index
Elastic::ProcessBookkeepingService.clear_tracking!
Gitlab::Elastic::Helper.default.delete_index(index_name: @migrations_index_name)
end
config.include ElasticsearchHelpers, :elastic
......
......@@ -35,6 +35,11 @@ module ElasticsearchHelpers
end
def refresh_index!
::Gitlab::Elastic::Helper.default.refresh_index
es_helper.refresh_index
es_helper.refresh_index(index_name: @migrations_index_name) # rubocop:disable Gitlab/ModuleWithInstanceVariables
end
def es_helper
Gitlab::Elastic::Helper.default
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Elastic::MigrationWorker, :elastic do
subject { described_class.new }
describe '#perform' do
context 'indexing is disabled' do
before do
stub_ee_application_setting(elasticsearch_indexing: false)
end
it 'returns without execution' do
expect(subject).not_to receive(:execute_migration)
expect(subject.perform).to be_falsey
end
end
context 'indexing is enabled' do
before do
stub_ee_application_setting(elasticsearch_indexing: true)
end
it 'creates an index if it does not exist' do
Gitlab::Elastic::Helper.default.delete_index(index_name: @migrations_index_name)
expect { subject.perform }.to change { Gitlab::Elastic::Helper.default.index_exists?(index_name: @migrations_index_name) }.from(false).to(true)
end
context 'no unexecuted migrations' do
before do
allow(subject).to receive(:current_migration).and_return(nil)
end
it 'skips execution' do
expect(subject).not_to receive(:execute_migration)
expect(subject.perform).to be_falsey
end
end
context 'migration process' do
let(:migration) { Elastic::DataMigrationService.migrations.first }
before do
allow(subject).to receive(:current_migration).and_return(migration)
allow(migration).to receive(:persisted?).and_return(persisted)
allow(migration).to receive(:completed?).and_return(completed)
end
using RSpec::Parameterized::TableSyntax
where(:persisted, :completed, :execute_migration) do
false | false | true
false | true | true
true | false | false
true | true | false
end
with_them do
it 'calls migration only when needed' do
if execute_migration
expect(migration).to receive(:migrate).once
else
expect(migration).not_to receive(:migrate)
end
expect(migration).to receive(:save!).with(completed: completed)
subject.perform
end
end
end
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment