Commit 460acd7d authored by Patrick Bair's avatar Patrick Bair Committed by Mayra Cabrera

Split BG migration helpers into their own module

Move the BackgroundMigration helper methods out of the MigrationHelpers
into their own module, so they can be reused in other contexts without
pulling in the entire MigrationHelper module.
parent 7c5cfb4d
# frozen_string_literal: true
module Gitlab
module Database
module DynamicModelHelpers
def define_batchable_model(table_name)
Class.new(ActiveRecord::Base) do
include EachBatch
self.table_name = table_name
self.inheritance_column = :_type_disabled
end
end
end
end
end
......@@ -3,10 +3,10 @@
module Gitlab
module Database
module MigrationHelpers
include Migrations::BackgroundMigrationHelpers
# https://www.postgresql.org/docs/current/sql-syntax-lexical.html#SQL-SYNTAX-IDENTIFIERS
MAX_IDENTIFIER_NAME_LENGTH = 63
BACKGROUND_MIGRATION_BATCH_SIZE = 1000 # Number of rows to process per job
BACKGROUND_MIGRATION_JOB_BUFFER_SIZE = 1000 # Number of jobs to bulk queue at a time
PERMITTED_TIMESTAMP_COLUMNS = %i[created_at updated_at deleted_at].to_set.freeze
DEFAULT_TIMESTAMP_COLUMNS = %i[created_at updated_at].freeze
......@@ -786,10 +786,6 @@ module Gitlab
end
end
def perform_background_migration_inline?
Rails.env.test? || Rails.env.development?
end
# Performs a concurrent column rename when using PostgreSQL.
def install_rename_triggers_for_postgresql(trigger, table, old, new)
execute <<-EOF.strip_heredoc
......@@ -973,106 +969,6 @@ into similar problems in the future (e.g. when new tables are created).
end
end
# Bulk queues background migration jobs for an entire table, batched by ID range.
# "Bulk" meaning many jobs will be pushed at a time for efficiency.
# If you need a delay interval per job, then use `queue_background_migration_jobs_by_range_at_intervals`.
#
# model_class - The table being iterated over
# job_class_name - The background migration job class as a string
# batch_size - The maximum number of rows per job
#
# Example:
#
# class Route < ActiveRecord::Base
# include EachBatch
# self.table_name = 'routes'
# end
#
# bulk_queue_background_migration_jobs_by_range(Route, 'ProcessRoutes')
#
# Where the model_class includes EachBatch, and the background migration exists:
#
# class Gitlab::BackgroundMigration::ProcessRoutes
# def perform(start_id, end_id)
# # do something
# end
# end
def bulk_queue_background_migration_jobs_by_range(model_class, job_class_name, batch_size: BACKGROUND_MIGRATION_BATCH_SIZE)
raise "#{model_class} does not have an ID to use for batch ranges" unless model_class.column_names.include?('id')
jobs = []
table_name = model_class.quoted_table_name
model_class.each_batch(of: batch_size) do |relation|
start_id, end_id = relation.pluck("MIN(#{table_name}.id)", "MAX(#{table_name}.id)").first
if jobs.length >= BACKGROUND_MIGRATION_JOB_BUFFER_SIZE
# Note: This code path generally only helps with many millions of rows
# We push multiple jobs at a time to reduce the time spent in
# Sidekiq/Redis operations. We're using this buffer based approach so we
# don't need to run additional queries for every range.
bulk_migrate_async(jobs)
jobs.clear
end
jobs << [job_class_name, [start_id, end_id]]
end
bulk_migrate_async(jobs) unless jobs.empty?
end
# Queues background migration jobs for an entire table, batched by ID range.
# Each job is scheduled with a `delay_interval` in between.
# If you use a small interval, then some jobs may run at the same time.
#
# model_class - The table or relation being iterated over
# job_class_name - The background migration job class as a string
# delay_interval - The duration between each job's scheduled time (must respond to `to_f`)
# batch_size - The maximum number of rows per job
# other_arguments - Other arguments to send to the job
#
# *Returns the final migration delay*
#
# Example:
#
# class Route < ActiveRecord::Base
# include EachBatch
# self.table_name = 'routes'
# end
#
# queue_background_migration_jobs_by_range_at_intervals(Route, 'ProcessRoutes', 1.minute)
#
# Where the model_class includes EachBatch, and the background migration exists:
#
# class Gitlab::BackgroundMigration::ProcessRoutes
# def perform(start_id, end_id)
# # do something
# end
# end
def queue_background_migration_jobs_by_range_at_intervals(model_class, job_class_name, delay_interval, batch_size: BACKGROUND_MIGRATION_BATCH_SIZE, other_job_arguments: [], initial_delay: 0)
raise "#{model_class} does not have an ID to use for batch ranges" unless model_class.column_names.include?('id')
# To not overload the worker too much we enforce a minimum interval both
# when scheduling and performing jobs.
if delay_interval < BackgroundMigrationWorker.minimum_interval
delay_interval = BackgroundMigrationWorker.minimum_interval
end
final_delay = 0
model_class.each_batch(of: batch_size) do |relation, index|
start_id, end_id = relation.pluck(Arel.sql('MIN(id), MAX(id)')).first
# `BackgroundMigrationWorker.bulk_perform_in` schedules all jobs for
# the same time, which is not helpful in most cases where we wish to
# spread the work over time.
final_delay = initial_delay + delay_interval * index
migrate_in(final_delay, job_class_name, [start_id, end_id] + other_job_arguments)
end
final_delay
end
# Fetches indexes on a column by name for postgres.
#
# This will include indexes using an expression on the column, for example:
......@@ -1131,30 +1027,6 @@ into similar problems in the future (e.g. when new tables are created).
execute(sql)
end
def migrate_async(*args)
with_migration_context do
BackgroundMigrationWorker.perform_async(*args)
end
end
def migrate_in(*args)
with_migration_context do
BackgroundMigrationWorker.perform_in(*args)
end
end
def bulk_migrate_in(*args)
with_migration_context do
BackgroundMigrationWorker.bulk_perform_in(*args)
end
end
def bulk_migrate_async(*args)
with_migration_context do
BackgroundMigrationWorker.bulk_perform_async(*args)
end
end
# Returns the name for a check constraint
#
# type:
......@@ -1437,10 +1309,6 @@ into similar problems in the future (e.g. when new tables are created).
your migration class
ERROR
end
def with_migration_context(&block)
Gitlab::ApplicationContext.with_context(caller_id: self.class.to_s, &block)
end
end
end
end
# frozen_string_literal: true
module Gitlab
module Database
module Migrations
module BackgroundMigrationHelpers
BACKGROUND_MIGRATION_BATCH_SIZE = 1_000 # Number of rows to process per job
BACKGROUND_MIGRATION_JOB_BUFFER_SIZE = 1_000 # Number of jobs to bulk queue at a time
# Bulk queues background migration jobs for an entire table, batched by ID range.
# "Bulk" meaning many jobs will be pushed at a time for efficiency.
# If you need a delay interval per job, then use `queue_background_migration_jobs_by_range_at_intervals`.
#
# model_class - The table being iterated over
# job_class_name - The background migration job class as a string
# batch_size - The maximum number of rows per job
#
# Example:
#
# class Route < ActiveRecord::Base
# include EachBatch
# self.table_name = 'routes'
# end
#
# bulk_queue_background_migration_jobs_by_range(Route, 'ProcessRoutes')
#
# Where the model_class includes EachBatch, and the background migration exists:
#
# class Gitlab::BackgroundMigration::ProcessRoutes
# def perform(start_id, end_id)
# # do something
# end
# end
def bulk_queue_background_migration_jobs_by_range(model_class, job_class_name, batch_size: BACKGROUND_MIGRATION_BATCH_SIZE)
raise "#{model_class} does not have an ID to use for batch ranges" unless model_class.column_names.include?('id')
jobs = []
table_name = model_class.quoted_table_name
model_class.each_batch(of: batch_size) do |relation|
start_id, end_id = relation.pluck("MIN(#{table_name}.id)", "MAX(#{table_name}.id)").first
if jobs.length >= BACKGROUND_MIGRATION_JOB_BUFFER_SIZE
# Note: This code path generally only helps with many millions of rows
# We push multiple jobs at a time to reduce the time spent in
# Sidekiq/Redis operations. We're using this buffer based approach so we
# don't need to run additional queries for every range.
bulk_migrate_async(jobs)
jobs.clear
end
jobs << [job_class_name, [start_id, end_id]]
end
bulk_migrate_async(jobs) unless jobs.empty?
end
# Queues background migration jobs for an entire table, batched by ID range.
# Each job is scheduled with a `delay_interval` in between.
# If you use a small interval, then some jobs may run at the same time.
#
# model_class - The table or relation being iterated over
# job_class_name - The background migration job class as a string
# delay_interval - The duration between each job's scheduled time (must respond to `to_f`)
# batch_size - The maximum number of rows per job
# other_arguments - Other arguments to send to the job
#
# *Returns the final migration delay*
#
# Example:
#
# class Route < ActiveRecord::Base
# include EachBatch
# self.table_name = 'routes'
# end
#
# queue_background_migration_jobs_by_range_at_intervals(Route, 'ProcessRoutes', 1.minute)
#
# Where the model_class includes EachBatch, and the background migration exists:
#
# class Gitlab::BackgroundMigration::ProcessRoutes
# def perform(start_id, end_id)
# # do something
# end
# end
def queue_background_migration_jobs_by_range_at_intervals(model_class, job_class_name, delay_interval, batch_size: BACKGROUND_MIGRATION_BATCH_SIZE, other_job_arguments: [], initial_delay: 0)
raise "#{model_class} does not have an ID to use for batch ranges" unless model_class.column_names.include?('id')
# To not overload the worker too much we enforce a minimum interval both
# when scheduling and performing jobs.
if delay_interval < BackgroundMigrationWorker.minimum_interval
delay_interval = BackgroundMigrationWorker.minimum_interval
end
final_delay = 0
model_class.each_batch(of: batch_size) do |relation, index|
start_id, end_id = relation.pluck(Arel.sql('MIN(id), MAX(id)')).first
# `BackgroundMigrationWorker.bulk_perform_in` schedules all jobs for
# the same time, which is not helpful in most cases where we wish to
# spread the work over time.
final_delay = initial_delay + delay_interval * index
migrate_in(final_delay, job_class_name, [start_id, end_id] + other_job_arguments)
end
final_delay
end
def perform_background_migration_inline?
Rails.env.test? || Rails.env.development?
end
def migrate_async(*args)
with_migration_context do
BackgroundMigrationWorker.perform_async(*args)
end
end
def migrate_in(*args)
with_migration_context do
BackgroundMigrationWorker.perform_in(*args)
end
end
def bulk_migrate_in(*args)
with_migration_context do
BackgroundMigrationWorker.bulk_perform_in(*args)
end
end
def bulk_migrate_async(*args)
with_migration_context do
BackgroundMigrationWorker.bulk_perform_async(*args)
end
end
def with_migration_context(&block)
Gitlab::ApplicationContext.with_context(caller_id: self.class.to_s, &block)
end
end
end
end
end
# frozen_string_literal: true
module Gitlab
module Database
module PartitioningMigrationHelpers
# Class that will generically copy data from a given table into its corresponding partitioned table
class BackfillPartitionedTable
include ::Gitlab::Database::DynamicModelHelpers
SUB_BATCH_SIZE = 2_500
PAUSE_SECONDS = 0.25
def perform(start_id, stop_id, source_table, partitioned_table, source_column)
return unless Feature.enabled?(:backfill_partitioned_audit_events, default_enabled: true)
if transaction_open?
raise "Aborting job to backfill partitioned #{source_table} table! Do not run this job in a transaction block!"
end
unless table_exists?(partitioned_table)
logger.warn "exiting backfill migration because partitioned table #{partitioned_table} does not exist. " \
"This could be due to the migration being rolled back after migration jobs were enqueued in sidekiq"
return
end
bulk_copy = BulkCopy.new(source_table, partitioned_table, source_column)
parent_batch_relation = relation_scoped_to_range(source_table, source_column, start_id, stop_id)
parent_batch_relation.each_batch(of: SUB_BATCH_SIZE) do |sub_batch|
start_id, stop_id = sub_batch.pluck(Arel.sql("MIN(#{source_column}), MAX(#{source_column})")).first
bulk_copy.copy_between(start_id, stop_id)
sleep(PAUSE_SECONDS)
end
end
private
def connection
ActiveRecord::Base.connection
end
def transaction_open?
connection.transaction_open?
end
def table_exists?(table)
connection.table_exists?(table)
end
def logger
@logger ||= ::Gitlab::BackgroundMigration::Logger.build
end
def relation_scoped_to_range(source_table, source_key_column, start_id, stop_id)
define_batchable_model(source_table).where(source_key_column => start_id..stop_id)
end
# Helper class to copy data between two tables via upserts
class BulkCopy
DELIMITER = ', '
attr_reader :source_table, :destination_table, :source_column
def initialize(source_table, destination_table, source_column)
@source_table = source_table
@destination_table = destination_table
@source_column = source_column
end
def copy_between(start_id, stop_id)
connection.execute(<<~SQL)
INSERT INTO #{destination_table} (#{column_listing})
SELECT #{column_listing}
FROM #{source_table}
WHERE #{source_column} BETWEEN #{start_id} AND #{stop_id}
FOR UPDATE
ON CONFLICT (#{conflict_targets}) DO NOTHING
SQL
end
private
def connection
@connection ||= ActiveRecord::Base.connection
end
def column_listing
@column_listing ||= connection.columns(source_table).map(&:name).join(DELIMITER)
end
def conflict_targets
connection.primary_key(destination_table).join(DELIMITER)
end
end
end
end
end
end
......@@ -99,7 +99,7 @@ module Gitlab
drop_function(fn_name, if_exists: true)
else
create_or_replace_fk_function(fn_name, final_keys)
create_trigger(trigger_name, fn_name, fires: "AFTER DELETE ON #{to_table}")
create_trigger(to_table, trigger_name, fn_name, fires: 'AFTER DELETE')
end
end
end
......
......@@ -5,10 +5,16 @@ module Gitlab
module PartitioningMigrationHelpers
module TableManagementHelpers
include ::Gitlab::Database::SchemaHelpers
include ::Gitlab::Database::DynamicModelHelpers
include ::Gitlab::Database::Migrations::BackgroundMigrationHelpers
WHITELISTED_TABLES = %w[audit_events].freeze
ALLOWED_TABLES = %w[audit_events].freeze
ERROR_SCOPE = 'table partitioning'
MIGRATION_CLASS_NAME = "::#{module_parent_name}::BackfillPartitionedTable"
BATCH_INTERVAL = 2.minutes.freeze
BATCH_SIZE = 50_000
# Creates a partitioned copy of an existing table, using a RANGE partitioning strategy on a timestamp column.
# One partition is created per month between the given `min_date` and `max_date`.
#
......@@ -23,7 +29,7 @@ module Gitlab
# :max_date - a date specifying the upper bounds of the partitioning range
#
def partition_table_by_date(table_name, column_name, min_date:, max_date:)
assert_table_is_whitelisted(table_name)
assert_table_is_allowed(table_name)
assert_not_in_transaction_block(scope: ERROR_SCOPE)
raise "max_date #{max_date} must be greater than min_date #{min_date}" if min_date >= max_date
......@@ -35,9 +41,11 @@ module Gitlab
raise "partition column #{column_name} does not exist on #{table_name}" if partition_column.nil?
new_table_name = partitioned_table_name(table_name)
create_range_partitioned_copy(new_table_name, table_name, partition_column, primary_key)
create_daterange_partitions(new_table_name, partition_column.name, min_date, max_date)
create_sync_trigger(table_name, new_table_name, primary_key)
create_trigger_to_sync_tables(table_name, new_table_name, primary_key)
enqueue_background_migration(table_name, new_table_name, primary_key)
end
# Clean up a partitioned copy of an existing table. This deletes the partitioned table and all partitions.
......@@ -47,7 +55,7 @@ module Gitlab
# drop_partitioned_table_for :audit_events
#
def drop_partitioned_table_for(table_name)
assert_table_is_whitelisted(table_name)
assert_table_is_allowed(table_name)
assert_not_in_transaction_block(scope: ERROR_SCOPE)
with_lock_retries do
......@@ -64,10 +72,10 @@ module Gitlab
private
def assert_table_is_whitelisted(table_name)
return if WHITELISTED_TABLES.include?(table_name.to_s)
def assert_table_is_allowed(table_name)
return if ALLOWED_TABLES.include?(table_name.to_s)
raise "partitioning helpers are in active development, and #{table_name} is not whitelisted for use, " \
raise "partitioning helpers are in active development, and #{table_name} is not allowed for use, " \
"for more information please contact the database team"
end
......@@ -125,7 +133,8 @@ module Gitlab
min_date = min_date.beginning_of_month.to_date
max_date = max_date.next_month.beginning_of_month.to_date
create_range_partition_safely("#{table_name}_000000", table_name, 'MINVALUE', to_sql_date_literal(min_date))
upper_bound = to_sql_date_literal(min_date)
create_range_partition_safely("#{table_name}_000000", table_name, 'MINVALUE', upper_bound)
while min_date < max_date
partition_name = "#{table_name}_#{min_date.strftime('%Y%m')}"
......@@ -154,7 +163,7 @@ module Gitlab
create_range_partition(partition_name, table_name, lower_bound, upper_bound)
end
def create_sync_trigger(source_table, target_table, unique_key)
def create_trigger_to_sync_tables(source_table, target_table, unique_key)
function_name = sync_function_name(source_table)
trigger_name = sync_trigger_name(source_table)
......@@ -162,11 +171,19 @@ module Gitlab
create_sync_function(function_name, target_table, unique_key)
create_comment('FUNCTION', function_name, "Partitioning migration: table sync for #{source_table} table")
create_trigger(trigger_name, function_name, fires: "AFTER INSERT OR UPDATE OR DELETE ON #{source_table}")
create_sync_trigger(source_table, trigger_name, function_name)
end
end
def create_sync_function(name, target_table, unique_key)
if function_exists?(name)
# rubocop:disable Gitlab/RailsLogger
Rails.logger.warn "Partitioning sync function not created because it already exists" \
" (this may be due to an aborted migration or similar): function name: #{name}"
# rubocop:enable Gitlab/RailsLogger
return
end
delimiter = ",\n "
column_names = connection.columns(target_table).map(&:name)
set_statements = build_set_statements(column_names, unique_key)
......@@ -190,7 +207,30 @@ module Gitlab
end
def build_set_statements(column_names, unique_key)
column_names.reject { |name| name == unique_key }.map { |column_name| "#{column_name} = NEW.#{column_name}" }
column_names.reject { |name| name == unique_key }.map { |name| "#{name} = NEW.#{name}" }
end
def create_sync_trigger(table_name, trigger_name, function_name)
if trigger_exists?(table_name, trigger_name)
# rubocop:disable Gitlab/RailsLogger
Rails.logger.warn "Partitioning sync trigger not created because it already exists" \
" (this may be due to an aborted migration or similar): trigger name: #{trigger_name}"
# rubocop:enable Gitlab/RailsLogger
return
end
create_trigger(table_name, trigger_name, function_name, fires: 'AFTER INSERT OR UPDATE OR DELETE')
end
def enqueue_background_migration(source_table, partitioned_table, source_key)
model_class = define_batchable_model(source_table)
queue_background_migration_jobs_by_range_at_intervals(
model_class,
MIGRATION_CLASS_NAME,
BATCH_INTERVAL,
batch_size: BATCH_SIZE,
other_job_arguments: [source_table.to_s, partitioned_table, source_key])
end
end
end
......
......@@ -16,15 +16,30 @@ module Gitlab
SQL
end
def create_trigger(name, function_name, fires: nil)
def function_exists?(name)
connection.select_value("SELECT 1 FROM pg_proc WHERE proname = '#{name}'")
end
def create_trigger(table_name, name, function_name, fires:)
execute(<<~SQL)
CREATE TRIGGER #{name}
#{fires}
#{fires} ON #{table_name}
FOR EACH ROW
EXECUTE PROCEDURE #{function_name}()
SQL
end
def trigger_exists?(table_name, name)
connection.select_value(<<~SQL)
SELECT 1
FROM pg_trigger
INNER JOIN pg_class
ON pg_trigger.tgrelid = pg_class.oid
WHERE pg_class.relname = '#{table_name}'
AND pg_trigger.tgname = '#{name}'
SQL
end
def drop_function(name, if_exists: true)
exists_clause = optional_clause(if_exists, "IF EXISTS")
execute("DROP FUNCTION #{exists_clause} #{name}()")
......
# frozen_string_literal: true
require 'spec_helper'
describe Gitlab::Database::DynamicModelHelpers do
describe '#define_batchable_model' do
subject { including_class.new.define_batchable_model(table_name) }
let(:including_class) { Class.new.include(described_class) }
let(:table_name) { 'projects' }
it 'is an ActiveRecord model' do
expect(subject.ancestors).to include(ActiveRecord::Base)
end
it 'includes EachBatch' do
expect(subject.included_modules).to include(EachBatch)
end
it 'has the correct table name' do
expect(subject.table_name).to eq(table_name)
end
it 'has the inheritance type column disable' do
expect(subject.inheritance_column).to eq('_type_disabled')
end
end
end
......@@ -1215,166 +1215,6 @@ describe Gitlab::Database::MigrationHelpers do
end
end
describe '#bulk_queue_background_migration_jobs_by_range' do
context 'when the model has an ID column' do
let!(:id1) { create(:user).id }
let!(:id2) { create(:user).id }
let!(:id3) { create(:user).id }
before do
User.class_eval do
include EachBatch
end
end
context 'with enough rows to bulk queue jobs more than once' do
before do
stub_const('Gitlab::Database::MigrationHelpers::BACKGROUND_MIGRATION_JOB_BUFFER_SIZE', 1)
end
it 'queues jobs correctly' do
Sidekiq::Testing.fake! do
model.bulk_queue_background_migration_jobs_by_range(User, 'FooJob', batch_size: 2)
expect(BackgroundMigrationWorker.jobs[0]['args']).to eq(['FooJob', [id1, id2]])
expect(BackgroundMigrationWorker.jobs[1]['args']).to eq(['FooJob', [id3, id3]])
end
end
it 'queues jobs in groups of buffer size 1' do
expect(BackgroundMigrationWorker).to receive(:bulk_perform_async).with([['FooJob', [id1, id2]]])
expect(BackgroundMigrationWorker).to receive(:bulk_perform_async).with([['FooJob', [id3, id3]]])
model.bulk_queue_background_migration_jobs_by_range(User, 'FooJob', batch_size: 2)
end
end
context 'with not enough rows to bulk queue jobs more than once' do
it 'queues jobs correctly' do
Sidekiq::Testing.fake! do
model.bulk_queue_background_migration_jobs_by_range(User, 'FooJob', batch_size: 2)
expect(BackgroundMigrationWorker.jobs[0]['args']).to eq(['FooJob', [id1, id2]])
expect(BackgroundMigrationWorker.jobs[1]['args']).to eq(['FooJob', [id3, id3]])
end
end
it 'queues jobs in bulk all at once (big buffer size)' do
expect(BackgroundMigrationWorker).to receive(:bulk_perform_async).with([['FooJob', [id1, id2]],
['FooJob', [id3, id3]]])
model.bulk_queue_background_migration_jobs_by_range(User, 'FooJob', batch_size: 2)
end
end
context 'without specifying batch_size' do
it 'queues jobs correctly' do
Sidekiq::Testing.fake! do
model.bulk_queue_background_migration_jobs_by_range(User, 'FooJob')
expect(BackgroundMigrationWorker.jobs[0]['args']).to eq(['FooJob', [id1, id3]])
end
end
end
end
context "when the model doesn't have an ID column" do
it 'raises error (for now)' do
expect do
model.bulk_queue_background_migration_jobs_by_range(ProjectAuthorization, 'FooJob')
end.to raise_error(StandardError, /does not have an ID/)
end
end
end
describe '#queue_background_migration_jobs_by_range_at_intervals' do
context 'when the model has an ID column' do
let!(:id1) { create(:user).id }
let!(:id2) { create(:user).id }
let!(:id3) { create(:user).id }
around do |example|
Timecop.freeze { example.run }
end
before do
User.class_eval do
include EachBatch
end
end
it 'returns the final expected delay' do
Sidekiq::Testing.fake! do
final_delay = model.queue_background_migration_jobs_by_range_at_intervals(User, 'FooJob', 10.minutes, batch_size: 2)
expect(final_delay.to_f).to eq(20.minutes.to_f)
end
end
it 'returns zero when nothing gets queued' do
Sidekiq::Testing.fake! do
final_delay = model.queue_background_migration_jobs_by_range_at_intervals(User.none, 'FooJob', 10.minutes)
expect(final_delay).to eq(0)
end
end
context 'with batch_size option' do
it 'queues jobs correctly' do
Sidekiq::Testing.fake! do
model.queue_background_migration_jobs_by_range_at_intervals(User, 'FooJob', 10.minutes, batch_size: 2)
expect(BackgroundMigrationWorker.jobs[0]['args']).to eq(['FooJob', [id1, id2]])
expect(BackgroundMigrationWorker.jobs[0]['at']).to eq(10.minutes.from_now.to_f)
expect(BackgroundMigrationWorker.jobs[1]['args']).to eq(['FooJob', [id3, id3]])
expect(BackgroundMigrationWorker.jobs[1]['at']).to eq(20.minutes.from_now.to_f)
end
end
end
context 'without batch_size option' do
it 'queues jobs correctly' do
Sidekiq::Testing.fake! do
model.queue_background_migration_jobs_by_range_at_intervals(User, 'FooJob', 10.minutes)
expect(BackgroundMigrationWorker.jobs[0]['args']).to eq(['FooJob', [id1, id3]])
expect(BackgroundMigrationWorker.jobs[0]['at']).to eq(10.minutes.from_now.to_f)
end
end
end
context 'with other_job_arguments option' do
it 'queues jobs correctly' do
Sidekiq::Testing.fake! do
model.queue_background_migration_jobs_by_range_at_intervals(User, 'FooJob', 10.minutes, other_job_arguments: [1, 2])
expect(BackgroundMigrationWorker.jobs[0]['args']).to eq(['FooJob', [id1, id3, 1, 2]])
expect(BackgroundMigrationWorker.jobs[0]['at']).to eq(10.minutes.from_now.to_f)
end
end
end
context 'with initial_delay option' do
it 'queues jobs correctly' do
Sidekiq::Testing.fake! do
model.queue_background_migration_jobs_by_range_at_intervals(User, 'FooJob', 10.minutes, other_job_arguments: [1, 2], initial_delay: 10.minutes)
expect(BackgroundMigrationWorker.jobs[0]['args']).to eq(['FooJob', [id1, id3, 1, 2]])
expect(BackgroundMigrationWorker.jobs[0]['at']).to eq(20.minutes.from_now.to_f)
end
end
end
end
context "when the model doesn't have an ID column" do
it 'raises error (for now)' do
expect do
model.queue_background_migration_jobs_by_range_at_intervals(ProjectAuthorization, 'FooJob', 10.seconds)
end.to raise_error(StandardError, /does not have an ID/)
end
end
end
describe '#change_column_type_using_background_migration' do
let!(:issue) { create(:issue, :closed, closed_at: Time.zone.now) }
......@@ -1485,26 +1325,6 @@ describe Gitlab::Database::MigrationHelpers do
end
end
describe '#perform_background_migration_inline?' do
it 'returns true in a test environment' do
stub_rails_env('test')
expect(model.perform_background_migration_inline?).to eq(true)
end
it 'returns true in a development environment' do
stub_rails_env('development')
expect(model.perform_background_migration_inline?).to eq(true)
end
it 'returns false in a production environment' do
stub_rails_env('production')
expect(model.perform_background_migration_inline?).to eq(false)
end
end
describe '#index_exists_by_name?' do
it 'returns true if an index exists' do
ActiveRecord::Base.connection.execute(
......@@ -1973,62 +1793,6 @@ describe Gitlab::Database::MigrationHelpers do
end
end
describe '#migrate_async' do
it 'calls BackgroundMigrationWorker.perform_async' do
expect(BackgroundMigrationWorker).to receive(:perform_async).with("Class", "hello", "world")
model.migrate_async("Class", "hello", "world")
end
it 'pushes a context with the current class name as caller_id' do
expect(Gitlab::ApplicationContext).to receive(:with_context).with(caller_id: model.class.to_s)
model.migrate_async('Class', 'hello', 'world')
end
end
describe '#migrate_in' do
it 'calls BackgroundMigrationWorker.perform_in' do
expect(BackgroundMigrationWorker).to receive(:perform_in).with(10.minutes, 'Class', 'Hello', 'World')
model.migrate_in(10.minutes, 'Class', 'Hello', 'World')
end
it 'pushes a context with the current class name as caller_id' do
expect(Gitlab::ApplicationContext).to receive(:with_context).with(caller_id: model.class.to_s)
model.migrate_in(10.minutes, 'Class', 'Hello', 'World')
end
end
describe '#bulk_migrate_async' do
it 'calls BackgroundMigrationWorker.bulk_perform_async' do
expect(BackgroundMigrationWorker).to receive(:bulk_perform_async).with([%w(Class hello world)])
model.bulk_migrate_async([%w(Class hello world)])
end
it 'pushes a context with the current class name as caller_id' do
expect(Gitlab::ApplicationContext).to receive(:with_context).with(caller_id: model.class.to_s)
model.bulk_migrate_async([%w(Class hello world)])
end
end
describe '#bulk_migrate_in' do
it 'calls BackgroundMigrationWorker.bulk_perform_in_' do
expect(BackgroundMigrationWorker).to receive(:bulk_perform_in).with(10.minutes, [%w(Class hello world)])
model.bulk_migrate_in(10.minutes, [%w(Class hello world)])
end
it 'pushes a context with the current class name as caller_id' do
expect(Gitlab::ApplicationContext).to receive(:with_context).with(caller_id: model.class.to_s)
model.bulk_migrate_in(10.minutes, [%w(Class hello world)])
end
end
describe '#check_constraint_name' do
it 'returns a valid constraint name' do
name = model.check_constraint_name(:this_is_a_very_long_table_name,
......
# frozen_string_literal: true
require 'spec_helper'
describe Gitlab::Database::Migrations::BackgroundMigrationHelpers do
let(:model) do
ActiveRecord::Migration.new.extend(described_class)
end
describe '#bulk_queue_background_migration_jobs_by_range' do
context 'when the model has an ID column' do
let!(:id1) { create(:user).id }
let!(:id2) { create(:user).id }
let!(:id3) { create(:user).id }
before do
User.class_eval do
include EachBatch
end
end
context 'with enough rows to bulk queue jobs more than once' do
before do
stub_const('Gitlab::Database::Migrations::BackgroundMigrationHelpers::BACKGROUND_MIGRATION_JOB_BUFFER_SIZE', 1)
end
it 'queues jobs correctly' do
Sidekiq::Testing.fake! do
model.bulk_queue_background_migration_jobs_by_range(User, 'FooJob', batch_size: 2)
expect(BackgroundMigrationWorker.jobs[0]['args']).to eq(['FooJob', [id1, id2]])
expect(BackgroundMigrationWorker.jobs[1]['args']).to eq(['FooJob', [id3, id3]])
end
end
it 'queues jobs in groups of buffer size 1' do
expect(BackgroundMigrationWorker).to receive(:bulk_perform_async).with([['FooJob', [id1, id2]]])
expect(BackgroundMigrationWorker).to receive(:bulk_perform_async).with([['FooJob', [id3, id3]]])
model.bulk_queue_background_migration_jobs_by_range(User, 'FooJob', batch_size: 2)
end
end
context 'with not enough rows to bulk queue jobs more than once' do
it 'queues jobs correctly' do
Sidekiq::Testing.fake! do
model.bulk_queue_background_migration_jobs_by_range(User, 'FooJob', batch_size: 2)
expect(BackgroundMigrationWorker.jobs[0]['args']).to eq(['FooJob', [id1, id2]])
expect(BackgroundMigrationWorker.jobs[1]['args']).to eq(['FooJob', [id3, id3]])
end
end
it 'queues jobs in bulk all at once (big buffer size)' do
expect(BackgroundMigrationWorker).to receive(:bulk_perform_async).with([['FooJob', [id1, id2]],
['FooJob', [id3, id3]]])
model.bulk_queue_background_migration_jobs_by_range(User, 'FooJob', batch_size: 2)
end
end
context 'without specifying batch_size' do
it 'queues jobs correctly' do
Sidekiq::Testing.fake! do
model.bulk_queue_background_migration_jobs_by_range(User, 'FooJob')
expect(BackgroundMigrationWorker.jobs[0]['args']).to eq(['FooJob', [id1, id3]])
end
end
end
end
context "when the model doesn't have an ID column" do
it 'raises error (for now)' do
expect do
model.bulk_queue_background_migration_jobs_by_range(ProjectAuthorization, 'FooJob')
end.to raise_error(StandardError, /does not have an ID/)
end
end
end
describe '#queue_background_migration_jobs_by_range_at_intervals' do
context 'when the model has an ID column' do
let!(:id1) { create(:user).id }
let!(:id2) { create(:user).id }
let!(:id3) { create(:user).id }
around do |example|
Timecop.freeze { example.run }
end
before do
User.class_eval do
include EachBatch
end
end
it 'returns the final expected delay' do
Sidekiq::Testing.fake! do
final_delay = model.queue_background_migration_jobs_by_range_at_intervals(User, 'FooJob', 10.minutes, batch_size: 2)
expect(final_delay.to_f).to eq(20.minutes.to_f)
end
end
it 'returns zero when nothing gets queued' do
Sidekiq::Testing.fake! do
final_delay = model.queue_background_migration_jobs_by_range_at_intervals(User.none, 'FooJob', 10.minutes)
expect(final_delay).to eq(0)
end
end
context 'with batch_size option' do
it 'queues jobs correctly' do
Sidekiq::Testing.fake! do
model.queue_background_migration_jobs_by_range_at_intervals(User, 'FooJob', 10.minutes, batch_size: 2)
expect(BackgroundMigrationWorker.jobs[0]['args']).to eq(['FooJob', [id1, id2]])
expect(BackgroundMigrationWorker.jobs[0]['at']).to eq(10.minutes.from_now.to_f)
expect(BackgroundMigrationWorker.jobs[1]['args']).to eq(['FooJob', [id3, id3]])
expect(BackgroundMigrationWorker.jobs[1]['at']).to eq(20.minutes.from_now.to_f)
end
end
end
context 'without batch_size option' do
it 'queues jobs correctly' do
Sidekiq::Testing.fake! do
model.queue_background_migration_jobs_by_range_at_intervals(User, 'FooJob', 10.minutes)
expect(BackgroundMigrationWorker.jobs[0]['args']).to eq(['FooJob', [id1, id3]])
expect(BackgroundMigrationWorker.jobs[0]['at']).to eq(10.minutes.from_now.to_f)
end
end
end
context 'with other_job_arguments option' do
it 'queues jobs correctly' do
Sidekiq::Testing.fake! do
model.queue_background_migration_jobs_by_range_at_intervals(User, 'FooJob', 10.minutes, other_job_arguments: [1, 2])
expect(BackgroundMigrationWorker.jobs[0]['args']).to eq(['FooJob', [id1, id3, 1, 2]])
expect(BackgroundMigrationWorker.jobs[0]['at']).to eq(10.minutes.from_now.to_f)
end
end
end
context 'with initial_delay option' do
it 'queues jobs correctly' do
Sidekiq::Testing.fake! do
model.queue_background_migration_jobs_by_range_at_intervals(User, 'FooJob', 10.minutes, other_job_arguments: [1, 2], initial_delay: 10.minutes)
expect(BackgroundMigrationWorker.jobs[0]['args']).to eq(['FooJob', [id1, id3, 1, 2]])
expect(BackgroundMigrationWorker.jobs[0]['at']).to eq(20.minutes.from_now.to_f)
end
end
end
end
context "when the model doesn't have an ID column" do
it 'raises error (for now)' do
expect do
model.queue_background_migration_jobs_by_range_at_intervals(ProjectAuthorization, 'FooJob', 10.seconds)
end.to raise_error(StandardError, /does not have an ID/)
end
end
end
describe '#perform_background_migration_inline?' do
it 'returns true in a test environment' do
stub_rails_env('test')
expect(model.perform_background_migration_inline?).to eq(true)
end
it 'returns true in a development environment' do
stub_rails_env('development')
expect(model.perform_background_migration_inline?).to eq(true)
end
it 'returns false in a production environment' do
stub_rails_env('production')
expect(model.perform_background_migration_inline?).to eq(false)
end
end
describe '#migrate_async' do
it 'calls BackgroundMigrationWorker.perform_async' do
expect(BackgroundMigrationWorker).to receive(:perform_async).with("Class", "hello", "world")
model.migrate_async("Class", "hello", "world")
end
it 'pushes a context with the current class name as caller_id' do
expect(Gitlab::ApplicationContext).to receive(:with_context).with(caller_id: model.class.to_s)
model.migrate_async('Class', 'hello', 'world')
end
end
describe '#migrate_in' do
it 'calls BackgroundMigrationWorker.perform_in' do
expect(BackgroundMigrationWorker).to receive(:perform_in).with(10.minutes, 'Class', 'Hello', 'World')
model.migrate_in(10.minutes, 'Class', 'Hello', 'World')
end
it 'pushes a context with the current class name as caller_id' do
expect(Gitlab::ApplicationContext).to receive(:with_context).with(caller_id: model.class.to_s)
model.migrate_in(10.minutes, 'Class', 'Hello', 'World')
end
end
describe '#bulk_migrate_async' do
it 'calls BackgroundMigrationWorker.bulk_perform_async' do
expect(BackgroundMigrationWorker).to receive(:bulk_perform_async).with([%w(Class hello world)])
model.bulk_migrate_async([%w(Class hello world)])
end
it 'pushes a context with the current class name as caller_id' do
expect(Gitlab::ApplicationContext).to receive(:with_context).with(caller_id: model.class.to_s)
model.bulk_migrate_async([%w(Class hello world)])
end
end
describe '#bulk_migrate_in' do
it 'calls BackgroundMigrationWorker.bulk_perform_in_' do
expect(BackgroundMigrationWorker).to receive(:bulk_perform_in).with(10.minutes, [%w(Class hello world)])
model.bulk_migrate_in(10.minutes, [%w(Class hello world)])
end
it 'pushes a context with the current class name as caller_id' do
expect(Gitlab::ApplicationContext).to receive(:with_context).with(caller_id: model.class.to_s)
model.bulk_migrate_in(10.minutes, [%w(Class hello world)])
end
end
end
# frozen_string_literal: true
require 'spec_helper'
describe Gitlab::Database::PartitioningMigrationHelpers::BackfillPartitionedTable, '#perform' do
subject { described_class.new }
let(:source_table) { '_test_partitioning_backfills' }
let(:destination_table) { "#{source_table}_part" }
let(:unique_key) { 'id' }
before do
allow(subject).to receive(:transaction_open?).and_return(false)
end
context 'when the destination table exists' do
before do
connection.execute(<<~SQL)
CREATE TABLE #{source_table} (
id serial NOT NULL PRIMARY KEY,
col1 int NOT NULL,
col2 text NOT NULL,
created_at timestamptz NOT NULL
)
SQL
connection.execute(<<~SQL)
CREATE TABLE #{destination_table} (
id serial NOT NULL,
col1 int NOT NULL,
col2 text NOT NULL,
created_at timestamptz NOT NULL,
PRIMARY KEY (id, created_at)
) PARTITION BY RANGE (created_at)
SQL
connection.execute(<<~SQL)
CREATE TABLE #{destination_table}_202001 PARTITION OF #{destination_table}
FOR VALUES FROM ('2020-01-01') TO ('2020-02-01')
SQL
connection.execute(<<~SQL)
CREATE TABLE #{destination_table}_202002 PARTITION OF #{destination_table}
FOR VALUES FROM ('2020-02-01') TO ('2020-03-01')
SQL
source_model.table_name = source_table
destination_model.table_name = destination_table
stub_const("#{described_class}::SUB_BATCH_SIZE", 2)
stub_const("#{described_class}::PAUSE_SECONDS", pause_seconds)
allow(subject).to receive(:sleep)
end
let(:connection) { ActiveRecord::Base.connection }
let(:source_model) { Class.new(ActiveRecord::Base) }
let(:destination_model) { Class.new(ActiveRecord::Base) }
let(:timestamp) { Time.utc(2020, 1, 2).round }
let(:pause_seconds) { 1 }
let!(:source1) { create_source_record(timestamp) }
let!(:source2) { create_source_record(timestamp + 1.day) }
let!(:source3) { create_source_record(timestamp + 1.month) }
it 'copies data into the destination table idempotently' do
expect(destination_model.count).to eq(0)
subject.perform(source1.id, source3.id, source_table, destination_table, unique_key)
expect(destination_model.count).to eq(3)
source_model.find_each do |source_record|
destination_record = destination_model.find_by_id(source_record.id)
expect(destination_record.attributes).to eq(source_record.attributes)
end
subject.perform(source1.id, source3.id, source_table, destination_table, unique_key)
expect(destination_model.count).to eq(3)
end
it 'breaks the assigned batch into smaller batches' do
expect_next_instance_of(described_class::BulkCopy) do |bulk_copy|
expect(bulk_copy).to receive(:copy_between).with(source1.id, source2.id)
expect(bulk_copy).to receive(:copy_between).with(source3.id, source3.id)
end
subject.perform(source1.id, source3.id, source_table, destination_table, unique_key)
end
it 'pauses after copying each sub-batch' do
expect(subject).to receive(:sleep).with(pause_seconds).twice
subject.perform(source1.id, source3.id, source_table, destination_table, unique_key)
end
context 'when the feature flag is disabled' do
let(:mock_connection) { double('connection') }
before do
allow(subject).to receive(:connection).and_return(mock_connection)
stub_feature_flags(backfill_partitioned_audit_events: false)
end
it 'exits without attempting to copy data' do
expect(mock_connection).not_to receive(:execute)
subject.perform(1, 100, source_table, destination_table, unique_key)
expect(destination_model.count).to eq(0)
end
end
context 'when the job is run within an explicit transaction block' do
let(:mock_connection) { double('connection') }
before do
allow(subject).to receive(:connection).and_return(mock_connection)
allow(subject).to receive(:transaction_open?).and_return(true)
end
it 'raises an error before copying data' do
expect(mock_connection).not_to receive(:execute)
expect do
subject.perform(1, 100, source_table, destination_table, unique_key)
end.to raise_error(/Aborting job to backfill partitioned #{source_table}/)
expect(destination_model.count).to eq(0)
end
end
end
context 'when the destination table does not exist' do
let(:mock_connection) { double('connection') }
let(:mock_logger) { double('logger') }
before do
allow(subject).to receive(:connection).and_return(mock_connection)
allow(subject).to receive(:logger).and_return(mock_logger)
expect(mock_connection).to receive(:table_exists?).with(destination_table).and_return(false)
allow(mock_logger).to receive(:warn)
end
it 'exits without attempting to copy data' do
expect(mock_connection).not_to receive(:execute)
subject.perform(1, 100, source_table, destination_table, unique_key)
end
it 'logs a warning message that the job was skipped' do
expect(mock_logger).to receive(:warn).with(/#{destination_table} does not exist/)
subject.perform(1, 100, source_table, destination_table, unique_key)
end
end
def create_source_record(timestamp)
source_model.create!(col1: 123, col2: 'original value', created_at: timestamp)
end
end
......@@ -25,7 +25,7 @@ describe Gitlab::Database::PartitioningMigrationHelpers::TableManagementHelpers
allow(migration).to receive(:partitioned_table_name).and_return(partitioned_table)
allow(migration).to receive(:sync_function_name).and_return(function_name)
allow(migration).to receive(:sync_trigger_name).and_return(trigger_name)
allow(migration).to receive(:assert_table_is_whitelisted)
allow(migration).to receive(:assert_table_is_allowed)
end
describe '#partition_table_by_date' do
......@@ -33,15 +33,19 @@ describe Gitlab::Database::PartitioningMigrationHelpers::TableManagementHelpers
let(:old_primary_key) { 'id' }
let(:new_primary_key) { [old_primary_key, partition_column] }
context 'when the table is not whitelisted' do
let(:template_table) { :this_table_is_not_whitelisted }
before do
allow(migration).to receive(:queue_background_migration_jobs_by_range_at_intervals)
end
context 'when the table is not allowed' do
let(:template_table) { :this_table_is_not_allowed }
it 'raises an error' do
expect(migration).to receive(:assert_table_is_whitelisted).with(template_table).and_call_original
expect(migration).to receive(:assert_table_is_allowed).with(template_table).and_call_original
expect do
migration.partition_table_by_date template_table, partition_column, min_date: min_date, max_date: max_date
end.to raise_error(/#{template_table} is not whitelisted for use/)
end.to raise_error(/#{template_table} is not allowed for use/)
end
end
......@@ -237,6 +241,36 @@ describe Gitlab::Database::PartitioningMigrationHelpers::TableManagementHelpers
expect(model.find(second_todo.id).attributes).to eq(second_todo.attributes)
end
end
describe 'copying historic data to the partitioned table' do
let(:template_table) { 'todos' }
let(:migration_class) { '::Gitlab::Database::PartitioningMigrationHelpers::BackfillPartitionedTable' }
let(:sub_batch_size) { described_class::SUB_BATCH_SIZE }
let(:pause_seconds) { described_class::PAUSE_SECONDS }
let!(:first_id) { create(:todo).id }
let!(:second_id) { create(:todo).id }
let!(:third_id) { create(:todo).id }
before do
stub_const("#{described_class.name}::BATCH_SIZE", 2)
expect(migration).to receive(:queue_background_migration_jobs_by_range_at_intervals).and_call_original
end
it 'enqueues jobs to copy each batch of data' do
Sidekiq::Testing.fake! do
migration.partition_table_by_date template_table, partition_column, min_date: min_date, max_date: max_date
expect(BackgroundMigrationWorker.jobs.size).to eq(2)
first_job_arguments = [first_id, second_id, template_table, partitioned_table, 'id']
expect(BackgroundMigrationWorker.jobs[0]['args']).to eq([migration_class, first_job_arguments])
second_job_arguments = [third_id, third_id, template_table, partitioned_table, 'id']
expect(BackgroundMigrationWorker.jobs[1]['args']).to eq([migration_class, second_job_arguments])
end
end
end
end
describe '#drop_partitioned_table_for' do
......@@ -244,15 +278,15 @@ describe Gitlab::Database::PartitioningMigrationHelpers::TableManagementHelpers
%w[000000 201912 202001 202002].map { |suffix| "#{partitioned_table}_#{suffix}" }.unshift(partitioned_table)
end
context 'when the table is not whitelisted' do
let(:template_table) { :this_table_is_not_whitelisted }
context 'when the table is not allowed' do
let(:template_table) { :this_table_is_not_allowed }
it 'raises an error' do
expect(migration).to receive(:assert_table_is_whitelisted).with(template_table).and_call_original
expect(migration).to receive(:assert_table_is_allowed).with(template_table).and_call_original
expect do
migration.drop_partitioned_table_for template_table
end.to raise_error(/#{template_table} is not whitelisted for use/)
end.to raise_error(/#{template_table} is not allowed for use/)
end
end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment