Commit 134ee51f authored by Mayra Cabrera's avatar Mayra Cabrera

Merge branch '218428-finalize-partitioning-migration' into 'master'

Migration helpers to finalize partitioning migration

See merge request gitlab-org/gitlab!35201
parents 28eb2562 73c8eeb9
...@@ -3,6 +3,8 @@ ...@@ -3,6 +3,8 @@
module Gitlab module Gitlab
module Database module Database
class BackgroundMigrationJob < ActiveRecord::Base # rubocop:disable Rails/ApplicationRecord class BackgroundMigrationJob < ActiveRecord::Base # rubocop:disable Rails/ApplicationRecord
include EachBatch
self.table_name = :background_migration_jobs self.table_name = :background_migration_jobs
scope :for_migration_class, -> (class_name) { where(class_name: normalize_class_name(class_name)) } scope :for_migration_class, -> (class_name) { where(class_name: normalize_class_name(class_name)) }
......
...@@ -6,6 +6,7 @@ module Gitlab ...@@ -6,6 +6,7 @@ module Gitlab
module TableManagementHelpers module TableManagementHelpers
include ::Gitlab::Database::SchemaHelpers include ::Gitlab::Database::SchemaHelpers
include ::Gitlab::Database::DynamicModelHelpers include ::Gitlab::Database::DynamicModelHelpers
include ::Gitlab::Database::MigrationHelpers
include ::Gitlab::Database::Migrations::BackgroundMigrationHelpers include ::Gitlab::Database::Migrations::BackgroundMigrationHelpers
ALLOWED_TABLES = %w[audit_events].freeze ALLOWED_TABLES = %w[audit_events].freeze
...@@ -15,6 +16,12 @@ module Gitlab ...@@ -15,6 +16,12 @@ module Gitlab
BATCH_INTERVAL = 2.minutes.freeze BATCH_INTERVAL = 2.minutes.freeze
BATCH_SIZE = 50_000 BATCH_SIZE = 50_000
JobArguments = Struct.new(:start_id, :stop_id, :source_table_name, :partitioned_table_name, :source_column) do
def self.from_array(arguments)
self.new(*arguments)
end
end
# Creates a partitioned copy of an existing table, using a RANGE partitioning strategy on a timestamp column. # Creates a partitioned copy of an existing table, using a RANGE partitioning strategy on a timestamp column.
# One partition is created per month between the given `min_date` and `max_date`. Also installs a trigger on # One partition is created per month between the given `min_date` and `max_date`. Also installs a trigger on
# the original table to copy writes into the partitioned table. To copy over historic data from before creation # the original table to copy writes into the partitioned table. To copy over historic data from before creation
...@@ -132,6 +139,42 @@ module Gitlab ...@@ -132,6 +139,42 @@ module Gitlab
end end
end end
# Executes cleanup tasks from a previous BackgroundMigration to backfill a partitioned table by finishing
# pending jobs and performing a final data synchronization.
# This performs two steps:
# 1. Wait to finish any pending BackgroundMigration jobs that have not succeeded
# 2. Inline copy any missed rows from the original table to the partitioned table
#
# **NOTE** Migrations using this method cannot be scheduled in the same release as the migration that
# schedules the background migration using the `enqueue_background_migration` helper, or else the
# background migration jobs will be force-executed.
#
# Example:
#
# finalize_backfilling_partitioned_table :audit_events
#
def finalize_backfilling_partitioned_table(table_name)
assert_table_is_allowed(table_name)
assert_not_in_transaction_block(scope: ERROR_SCOPE)
partitioned_table_name = make_partitioned_table_name(table_name)
unless table_exists?(partitioned_table_name)
raise "could not find partitioned table for #{table_name}, " \
"this could indicate the previous partitioning migration has been rolled back."
end
Gitlab::BackgroundMigration.steal(MIGRATION_CLASS_NAME) do |raw_arguments|
JobArguments.from_array(raw_arguments).source_table_name == table_name.to_s
end
primary_key = connection.primary_key(table_name)
copy_missed_records(table_name, partitioned_table_name, primary_key)
disable_statement_timeout do
execute("VACUUM FREEZE ANALYZE #{partitioned_table_name}")
end
end
private private
def assert_table_is_allowed(table_name) def assert_table_is_allowed(table_name)
...@@ -284,7 +327,7 @@ module Gitlab ...@@ -284,7 +327,7 @@ module Gitlab
create_trigger(table_name, trigger_name, function_name, fires: 'AFTER INSERT OR UPDATE OR DELETE') create_trigger(table_name, trigger_name, function_name, fires: 'AFTER INSERT OR UPDATE OR DELETE')
end end
def enqueue_background_migration(source_table_name, partitioned_table_name, source_key) def enqueue_background_migration(source_table_name, partitioned_table_name, source_column)
source_model = define_batchable_model(source_table_name) source_model = define_batchable_model(source_table_name)
queue_background_migration_jobs_by_range_at_intervals( queue_background_migration_jobs_by_range_at_intervals(
...@@ -292,13 +335,35 @@ module Gitlab ...@@ -292,13 +335,35 @@ module Gitlab
MIGRATION_CLASS_NAME, MIGRATION_CLASS_NAME,
BATCH_INTERVAL, BATCH_INTERVAL,
batch_size: BATCH_SIZE, batch_size: BATCH_SIZE,
other_job_arguments: [source_table_name.to_s, partitioned_table_name, source_key], other_job_arguments: [source_table_name.to_s, partitioned_table_name, source_column],
track_jobs: true) track_jobs: true)
end end
def cleanup_migration_jobs(table_name) def cleanup_migration_jobs(table_name)
::Gitlab::Database::BackgroundMigrationJob.for_partitioning_migration(MIGRATION_CLASS_NAME, table_name).delete_all ::Gitlab::Database::BackgroundMigrationJob.for_partitioning_migration(MIGRATION_CLASS_NAME, table_name).delete_all
end end
def copy_missed_records(source_table_name, partitioned_table_name, source_column)
backfill_table = BackfillPartitionedTable.new
relation = ::Gitlab::Database::BackgroundMigrationJob.pending
.for_partitioning_migration(MIGRATION_CLASS_NAME, source_table_name)
relation.each_batch do |batch|
batch.each do |pending_migration_job|
job_arguments = JobArguments.from_array(pending_migration_job.arguments)
start_id = job_arguments.start_id
stop_id = job_arguments.stop_id
say("Backfilling data into partitioned table for ids from #{start_id} to #{stop_id}")
job_updated_count = backfill_table.perform(start_id, stop_id, source_table_name,
partitioned_table_name, source_column)
unless job_updated_count > 0
raise "failed to update tracking record for ids from #{start_id} to #{stop_id}"
end
end
end
end
end end
end end
end end
......
...@@ -71,6 +71,15 @@ RSpec.describe Gitlab::Database::BackgroundMigrationJob do ...@@ -71,6 +71,15 @@ RSpec.describe Gitlab::Database::BackgroundMigrationJob do
expect(job4.reload).to be_pending expect(job4.reload).to be_pending
end end
it 'returns the number of jobs updated' do
expect(described_class.succeeded.count).to eq(0)
jobs_updated = described_class.mark_all_as_succeeded('::TestJob', [1, 100])
expect(jobs_updated).to eq(2)
expect(described_class.succeeded.count).to eq(2)
end
context 'when previous matching jobs have already succeeded' do context 'when previous matching jobs have already succeeded' do
let(:initial_time) { Time.now.round } let(:initial_time) { Time.now.round }
let!(:job1) { create(:background_migration_job, :succeeded, created_at: initial_time, updated_at: initial_time) } let!(:job1) { create(:background_migration_job, :succeeded, created_at: initial_time, updated_at: initial_time) }
......
...@@ -107,6 +107,15 @@ RSpec.describe Gitlab::Database::PartitioningMigrationHelpers::BackfillPartition ...@@ -107,6 +107,15 @@ RSpec.describe Gitlab::Database::PartitioningMigrationHelpers::BackfillPartition
end.to change { ::Gitlab::Database::BackgroundMigrationJob.succeeded.count }.from(0).to(1) end.to change { ::Gitlab::Database::BackgroundMigrationJob.succeeded.count }.from(0).to(1)
end end
it 'returns the number of job records marked as succeeded' do
create(:background_migration_job, class_name: "::#{described_class.name}",
arguments: [source1.id, source3.id, source_table, destination_table, unique_key])
jobs_updated = subject.perform(source1.id, source3.id, source_table, destination_table, unique_key)
expect(jobs_updated).to eq(1)
end
context 'when the feature flag is disabled' do context 'when the feature flag is disabled' do
let(:mock_connection) { double('connection') } let(:mock_connection) { double('connection') }
......
...@@ -480,6 +480,153 @@ RSpec.describe Gitlab::Database::PartitioningMigrationHelpers::TableManagementHe ...@@ -480,6 +480,153 @@ RSpec.describe Gitlab::Database::PartitioningMigrationHelpers::TableManagementHe
end end
end end
describe '#finalize_backfilling_partitioned_table' do
let(:source_table) { 'todos' }
let(:source_column) { 'id' }
context 'when the table is not allowed' do
let(:source_table) { :this_table_is_not_allowed }
it 'raises an error' do
expect(migration).to receive(:assert_table_is_allowed).with(source_table).and_call_original
expect do
migration.finalize_backfilling_partitioned_table source_table
end.to raise_error(/#{source_table} is not allowed for use/)
end
end
context 'when the partitioned table does not exist' do
it 'raises an error' do
expect(migration).to receive(:table_exists?).with(partitioned_table).and_return(false)
expect do
migration.finalize_backfilling_partitioned_table source_table
end.to raise_error(/could not find partitioned table for #{source_table}/)
end
end
context 'finishing pending background migration jobs' do
let(:source_table_double) { double('table name') }
let(:raw_arguments) { [1, 50_000, source_table_double, partitioned_table, source_column] }
before do
allow(migration).to receive(:table_exists?).with(partitioned_table).and_return(true)
allow(migration).to receive(:copy_missed_records)
allow(migration).to receive(:execute).with(/VACUUM/)
end
it 'finishes remaining jobs for the correct table' do
expect_next_instance_of(described_class::JobArguments) do |job_arguments|
expect(job_arguments).to receive(:source_table_name).and_call_original
end
expect(Gitlab::BackgroundMigration).to receive(:steal)
.with(described_class::MIGRATION_CLASS_NAME)
.and_yield(raw_arguments)
expect(source_table_double).to receive(:==).with(source_table.to_s)
migration.finalize_backfilling_partitioned_table source_table
end
end
context 'when there is missed data' do
let(:partitioned_model) { Class.new(ActiveRecord::Base) }
let(:timestamp) { Time.utc(2019, 12, 1, 12).round }
let!(:todo1) { create(:todo, created_at: timestamp, updated_at: timestamp) }
let!(:todo2) { create(:todo, created_at: timestamp, updated_at: timestamp) }
let!(:todo3) { create(:todo, created_at: timestamp, updated_at: timestamp) }
let!(:todo4) { create(:todo, created_at: timestamp, updated_at: timestamp) }
let!(:pending_job1) do
create(:background_migration_job,
class_name: described_class::MIGRATION_CLASS_NAME,
arguments: [todo1.id, todo2.id, source_table, partitioned_table, source_column])
end
let!(:pending_job2) do
create(:background_migration_job,
class_name: described_class::MIGRATION_CLASS_NAME,
arguments: [todo3.id, todo3.id, source_table, partitioned_table, source_column])
end
let!(:succeeded_job) do
create(:background_migration_job, :succeeded,
class_name: described_class::MIGRATION_CLASS_NAME,
arguments: [todo4.id, todo4.id, source_table, partitioned_table, source_column])
end
before do
partitioned_model.primary_key = :id
partitioned_model.table_name = partitioned_table
allow(migration).to receive(:queue_background_migration_jobs_by_range_at_intervals)
migration.partition_table_by_date source_table, partition_column, min_date: min_date, max_date: max_date
allow(Gitlab::BackgroundMigration).to receive(:steal)
allow(migration).to receive(:execute).with(/VACUUM/)
end
it 'idempotently cleans up after failed background migrations' do
expect(partitioned_model.count).to eq(0)
partitioned_model.insert!(todo2.attributes)
expect_next_instance_of(Gitlab::Database::PartitioningMigrationHelpers::BackfillPartitionedTable) do |backfill|
allow(backfill).to receive(:transaction_open?).and_return(false)
expect(backfill).to receive(:perform)
.with(todo1.id, todo2.id, source_table, partitioned_table, source_column)
.and_call_original
expect(backfill).to receive(:perform)
.with(todo3.id, todo3.id, source_table, partitioned_table, source_column)
.and_call_original
end
migration.finalize_backfilling_partitioned_table source_table
expect(partitioned_model.count).to eq(3)
[todo1, todo2, todo3].each do |original|
copy = partitioned_model.find(original.id)
expect(copy.attributes).to eq(original.attributes)
end
expect(partitioned_model.find_by_id(todo4.id)).to be_nil
[pending_job1, pending_job2].each do |job|
expect(job.reload).to be_succeeded
end
end
it 'raises an error if no job tracking records are marked as succeeded' do
expect_next_instance_of(Gitlab::Database::PartitioningMigrationHelpers::BackfillPartitionedTable) do |backfill|
allow(backfill).to receive(:transaction_open?).and_return(false)
expect(backfill).to receive(:perform).and_return(0)
end
expect do
migration.finalize_backfilling_partitioned_table source_table
end.to raise_error(/failed to update tracking record/)
end
it 'vacuums the table after loading is complete' do
expect_next_instance_of(Gitlab::Database::PartitioningMigrationHelpers::BackfillPartitionedTable) do |backfill|
allow(backfill).to receive(:perform).and_return(1)
end
expect(migration).to receive(:disable_statement_timeout).and_call_original
expect(migration).to receive(:execute).with("VACUUM FREEZE ANALYZE #{partitioned_table}")
migration.finalize_backfilling_partitioned_table source_table
end
end
end
def filter_columns_by_name(columns, names) def filter_columns_by_name(columns, names)
columns.reject { |c| names.include?(c.name) } columns.reject { |c| names.include?(c.name) }
end end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment