Commit cecfc1b2 authored by Patrick Bair's avatar Patrick Bair Committed by Mayra Cabrera

Add migration helper to partitioned table by date

Add a new migration helper that can be used to create a copy of an
existing table that is partitioned by a date column.
parent 130168ba
......@@ -3,120 +3,8 @@
module Gitlab
module Database
module PartitioningMigrationHelpers
include SchemaHelpers
def add_partitioned_foreign_key(from_table, to_table, column: nil, primary_key: :id, on_delete: :cascade)
cascade_delete = extract_cascade_option(on_delete)
update_foreign_keys(from_table, to_table, column, primary_key, cascade_delete) do |current_keys, existing_key, specified_key|
if existing_key.nil?
unless specified_key.save
raise "failed to create foreign key: #{specified_key.errors.full_messages.to_sentence}"
end
current_keys << specified_key
else
Rails.logger.warn "foreign key not added because it already exists: #{specified_key}" # rubocop:disable Gitlab/RailsLogger
current_keys
end
end
end
def remove_partitioned_foreign_key(from_table, to_table, column: nil, primary_key: :id)
update_foreign_keys(from_table, to_table, column, primary_key) do |current_keys, existing_key, specified_key|
if existing_key
existing_key.destroy!
current_keys.delete(existing_key)
else
Rails.logger.warn "foreign key not removed because it doesn't exist: #{specified_key}" # rubocop:disable Gitlab/RailsLogger
end
current_keys
end
end
def fk_function_name(table)
object_name(table, 'fk_cascade_function')
end
def fk_trigger_name(table)
object_name(table, 'fk_cascade_trigger')
end
private
def fk_from_spec(from_table, to_table, from_column, to_column, cascade_delete)
PartitionedForeignKey.new(from_table: from_table.to_s, to_table: to_table.to_s, from_column: from_column.to_s,
to_column: to_column.to_s, cascade_delete: cascade_delete)
end
def update_foreign_keys(from_table, to_table, from_column, to_column, cascade_delete = nil)
if transaction_open?
raise 'partitioned foreign key operations can not be run inside a transaction block, ' \
'you can disable transaction blocks by calling disable_ddl_transaction! ' \
'in the body of your migration class'
end
from_column ||= "#{to_table.to_s.singularize}_id"
specified_key = fk_from_spec(from_table, to_table, from_column, to_column, cascade_delete)
current_keys = PartitionedForeignKey.by_referenced_table(to_table).to_a
existing_key = find_existing_key(current_keys, specified_key)
final_keys = yield current_keys, existing_key, specified_key
fn_name = fk_function_name(to_table)
trigger_name = fk_trigger_name(to_table)
with_lock_retries do
drop_trigger(to_table, trigger_name, if_exists: true)
if final_keys.empty?
drop_function(fn_name, if_exists: true)
else
create_or_replace_fk_function(fn_name, final_keys)
create_function_trigger(trigger_name, fn_name, fires: "AFTER DELETE ON #{to_table}")
end
end
end
def extract_cascade_option(on_delete)
case on_delete
when :cascade then true
when :nullify then false
else raise ArgumentError, "invalid option #{on_delete} for :on_delete"
end
end
def with_lock_retries(&block)
Gitlab::Database::WithLockRetries.new({
klass: self.class,
logger: Gitlab::BackgroundMigration::Logger
}).run(&block)
end
def find_existing_key(keys, key)
keys.find { |k| k.from_table == key.from_table && k.from_column == key.from_column }
end
def create_or_replace_fk_function(fn_name, fk_specs)
create_trigger_function(fn_name, replace: true) do
cascade_statements = build_cascade_statements(fk_specs)
cascade_statements << 'RETURN OLD;'
cascade_statements.join("\n")
end
end
def build_cascade_statements(foreign_keys)
foreign_keys.map do |fks|
if fks.cascade_delete?
"DELETE FROM #{fks.from_table} WHERE #{fks.from_column} = OLD.#{fks.to_column};"
else
"UPDATE #{fks.from_table} SET #{fks.from_column} = NULL WHERE #{fks.from_column} = OLD.#{fks.to_column};"
end
end
end
include ForeignKeyHelpers
include TableManagementHelpers
end
end
end
# frozen_string_literal: true
module Gitlab
module Database
module PartitioningMigrationHelpers
module ForeignKeyHelpers
include ::Gitlab::Database::SchemaHelpers
# Creates a "foreign key" that references a partitioned table. Because foreign keys referencing partitioned
# tables are not supported in PG11, this does not create a true database foreign key, but instead implements the
# same functionality at the database level by using triggers.
#
# Example:
#
# add_partitioned_foreign_key :issues, :projects
#
# Available options:
#
# :column - name of the referencing column (otherwise inferred from the referenced table name)
# :primary_key - name of the primary key in the referenced table (defaults to id)
# :on_delete - supports either :cascade for ON DELETE CASCADE or :nullify for ON DELETE SET NULL
#
def add_partitioned_foreign_key(from_table, to_table, column: nil, primary_key: :id, on_delete: :cascade)
cascade_delete = extract_cascade_option(on_delete)
update_foreign_keys(from_table, to_table, column, primary_key, cascade_delete) do |current_keys, existing_key, specified_key|
if existing_key.nil?
unless specified_key.save
raise "failed to create foreign key: #{specified_key.errors.full_messages.to_sentence}"
end
current_keys << specified_key
else
Rails.logger.warn "foreign key not added because it already exists: #{specified_key}" # rubocop:disable Gitlab/RailsLogger
current_keys
end
end
end
# Drops a "foreign key" that references a partitioned table. This method ONLY applies to foreign keys previously
# created through the `add_partitioned_foreign_key` method. Standard database foreign keys should be managed
# through the familiar Rails helpers.
#
# Example:
#
# remove_partitioned_foreign_key :issues, :projects
#
# Available options:
#
# :column - name of the referencing column (otherwise inferred from the referenced table name)
# :primary_key - name of the primary key in the referenced table (defaults to id)
#
def remove_partitioned_foreign_key(from_table, to_table, column: nil, primary_key: :id)
update_foreign_keys(from_table, to_table, column, primary_key) do |current_keys, existing_key, specified_key|
if existing_key
existing_key.delete
current_keys.delete(existing_key)
else
Rails.logger.warn "foreign key not removed because it doesn't exist: #{specified_key}" # rubocop:disable Gitlab/RailsLogger
end
current_keys
end
end
private
def fk_function_name(table)
object_name(table, 'fk_cascade_function')
end
def fk_trigger_name(table)
object_name(table, 'fk_cascade_trigger')
end
def fk_from_spec(from_table, to_table, from_column, to_column, cascade_delete)
PartitionedForeignKey.new(from_table: from_table.to_s, to_table: to_table.to_s, from_column: from_column.to_s,
to_column: to_column.to_s, cascade_delete: cascade_delete)
end
def update_foreign_keys(from_table, to_table, from_column, to_column, cascade_delete = nil)
if transaction_open?
raise 'partitioned foreign key operations can not be run inside a transaction block, ' \
'you can disable transaction blocks by calling disable_ddl_transaction! ' \
'in the body of your migration class'
end
from_column ||= "#{to_table.to_s.singularize}_id"
specified_key = fk_from_spec(from_table, to_table, from_column, to_column, cascade_delete)
current_keys = PartitionedForeignKey.by_referenced_table(to_table).to_a
existing_key = find_existing_key(current_keys, specified_key)
final_keys = yield current_keys, existing_key, specified_key
fn_name = fk_function_name(to_table)
trigger_name = fk_trigger_name(to_table)
with_lock_retries do
drop_trigger(to_table, trigger_name, if_exists: true)
if final_keys.empty?
drop_function(fn_name, if_exists: true)
else
create_or_replace_fk_function(fn_name, final_keys)
create_function_trigger(trigger_name, fn_name, fires: "AFTER DELETE ON #{to_table}")
end
end
end
def extract_cascade_option(on_delete)
case on_delete
when :cascade then true
when :nullify then false
else raise ArgumentError, "invalid option #{on_delete} for :on_delete"
end
end
def with_lock_retries(&block)
Gitlab::Database::WithLockRetries.new({
klass: self.class,
logger: Gitlab::BackgroundMigration::Logger
}).run(&block)
end
def find_existing_key(keys, key)
keys.find { |k| k.from_table == key.from_table && k.from_column == key.from_column }
end
def create_or_replace_fk_function(fn_name, fk_specs)
create_trigger_function(fn_name, replace: true) do
cascade_statements = build_cascade_statements(fk_specs)
cascade_statements << 'RETURN OLD;'
cascade_statements.join("\n")
end
end
def build_cascade_statements(foreign_keys)
foreign_keys.map do |fks|
if fks.cascade_delete?
"DELETE FROM #{fks.from_table} WHERE #{fks.from_column} = OLD.#{fks.to_column};"
else
"UPDATE #{fks.from_table} SET #{fks.from_column} = NULL WHERE #{fks.from_column} = OLD.#{fks.to_column};"
end
end
end
end
end
end
end
# frozen_string_literal: true
module Gitlab
module Database
module PartitioningMigrationHelpers
module TableManagementHelpers
include SchemaHelpers
# Creates a partitioned copy of an existing table, using a RANGE partitioning strategy on a timestamp column.
# One partition is created per month between the given `min_date` and `max_date`.
#
# A copy of the original table is required as PG currently does not support partitioning existing tables.
#
# Example:
#
# partition_table_by_date :audit_events, :created_at, min_date: Date.new(2020, 1), max_date: Date.new(2020, 6)
#
# Required options are:
# :min_date - a date specifying the lower bounds of the partition range
# :max_date - a date specifying the upper bounds of the partitioning range
#
def partition_table_by_date(table_name, column_name, min_date:, max_date:)
raise "max_date #{max_date} must be greater than min_date #{min_date}" if min_date >= max_date
primary_key = connection.primary_key(table_name)
raise "primary key not defined for #{table_name}" if primary_key.nil?
partition_column = find_column_definition(table_name, column_name)
raise "partition column #{column_name} does not exist on #{table_name}" if partition_column.nil?
new_table_name = partitioned_table_name(table_name)
create_range_partitioned_copy(new_table_name, table_name, partition_column, primary_key)
create_daterange_partitions(new_table_name, partition_column.name, min_date, max_date)
end
# Clean up a partitioned copy of an existing table. This deletes the partitioned table and all partitions.
#
# Example:
#
# drop_partitioned_table_for :audit_events
#
def drop_partitioned_table_for(table_name)
drop_table(partitioned_table_name(table_name))
end
private
def partitioned_table_name(table)
tmp_table_name("#{table}_part")
end
def find_column_definition(table, column)
connection.columns(table).find { |c| c.name == column.to_s }
end
def create_range_partitioned_copy(table_name, template_table_name, partition_column, primary_key)
tmp_column_name = object_name(partition_column.name, 'partition_key')
execute(<<~SQL)
CREATE TABLE #{table_name} (
LIKE #{template_table_name} INCLUDING ALL EXCLUDING INDEXES,
#{tmp_column_name} #{partition_column.sql_type} NOT NULL,
PRIMARY KEY (#{[primary_key, tmp_column_name].join(", ")})
) PARTITION BY RANGE (#{tmp_column_name})
SQL
remove_column(table_name, partition_column.name)
rename_column(table_name, tmp_column_name, partition_column.name)
change_column_default(table_name, primary_key, nil)
end
def create_daterange_partitions(table_name, column_name, min_date, max_date)
min_date = min_date.beginning_of_month.to_date
max_date = max_date.next_month.beginning_of_month.to_date
create_range_partition("#{table_name}_000000", table_name, 'MINVALUE', to_sql_date_literal(min_date))
while min_date < max_date
partition_name = "#{table_name}_#{min_date.strftime('%Y%m')}"
next_date = min_date.next_month
lower_bound = to_sql_date_literal(min_date)
upper_bound = to_sql_date_literal(next_date)
create_range_partition(partition_name, table_name, lower_bound, upper_bound)
min_date = next_date
end
end
def to_sql_date_literal(date)
connection.quote(date.strftime('%Y-%m-%d'))
end
def create_range_partition(partition_name, table_name, lower_bound, upper_bound)
execute(<<~SQL)
CREATE TABLE #{partition_name} PARTITION OF #{table_name}
FOR VALUES FROM (#{lower_bound}) TO (#{upper_bound})
SQL
end
end
end
end
end
......@@ -35,6 +35,12 @@ module Gitlab
execute("DROP TRIGGER #{exists_clause} #{name} ON #{table_name}")
end
def tmp_table_name(base)
hashed_base = Digest::SHA256.hexdigest(base).first(10)
"#{base}_#{hashed_base}"
end
def object_name(table, type)
identifier = "#{table}_#{type}"
hashed_identifier = Digest::SHA256.hexdigest(identifier).first(10)
......
......@@ -2,17 +2,19 @@
require 'spec_helper'
describe Gitlab::Database::PartitioningMigrationHelpers do
describe Gitlab::Database::PartitioningMigrationHelpers::ForeignKeyHelpers do
let(:model) do
ActiveRecord::Migration.new.extend(described_class)
end
let_it_be(:connection) { ActiveRecord::Base.connection }
let(:referenced_table) { :issues }
let(:function_name) { model.fk_function_name(referenced_table) }
let(:trigger_name) { model.fk_trigger_name(referenced_table) }
let(:function_name) { '_test_partitioned_foreign_keys_function' }
let(:trigger_name) { '_test_partitioned_foreign_keys_trigger' }
before do
allow(model).to receive(:puts)
allow(model).to receive(:fk_function_name).and_return(function_name)
allow(model).to receive(:fk_trigger_name).and_return(trigger_name)
end
describe 'adding a foreign key' do
......
# frozen_string_literal: true
require 'spec_helper'
describe Gitlab::Database::PartitioningMigrationHelpers::TableManagementHelpers do
include PartitioningHelpers
let(:model) do
ActiveRecord::Migration.new.extend(described_class)
end
let_it_be(:connection) { ActiveRecord::Base.connection }
let(:template_table) { :audit_events }
let(:partitioned_table) { '_test_migration_partitioned_table' }
let(:partition_column) { 'created_at' }
let(:min_date) { Date.new(2019, 12) }
let(:max_date) { Date.new(2020, 3) }
before do
allow(model).to receive(:puts)
allow(model).to receive(:partitioned_table_name).and_return(partitioned_table)
end
describe '#partition_table_by_date' do
let(:old_primary_key) { 'id' }
let(:new_primary_key) { [old_primary_key, partition_column] }
context 'when the the max_date is less than the min_date' do
let(:max_date) { Time.utc(2019, 6) }
it 'raises an error' do
expect do
model.partition_table_by_date template_table, partition_column, min_date: min_date, max_date: max_date
end.to raise_error(/max_date #{max_date} must be greater than min_date #{min_date}/)
end
end
context 'when the max_date is equal to the min_date' do
let(:max_date) { min_date }
it 'raises an error' do
expect do
model.partition_table_by_date template_table, partition_column, min_date: min_date, max_date: max_date
end.to raise_error(/max_date #{max_date} must be greater than min_date #{min_date}/)
end
end
context 'when the given table does not have a primary key' do
let(:template_table) { :_partitioning_migration_helper_test_table }
let(:partition_column) { :some_field }
it 'raises an error' do
model.create_table template_table, id: false do |t|
t.integer :id
t.datetime partition_column
end
expect do
model.partition_table_by_date template_table, partition_column, min_date: min_date, max_date: max_date
end.to raise_error(/primary key not defined for #{template_table}/)
end
end
context 'when an invalid partition column is given' do
let(:partition_column) { :_this_is_not_real }
it 'raises an error' do
expect do
model.partition_table_by_date template_table, partition_column, min_date: min_date, max_date: max_date
end.to raise_error(/partition column #{partition_column} does not exist/)
end
end
context 'when a valid source table and partition column is given' do
it 'creates a table partitioned by the proper column' do
model.partition_table_by_date template_table, partition_column, min_date: min_date, max_date: max_date
expect(connection.table_exists?(partitioned_table)).to be(true)
expect(connection.primary_key(partitioned_table)).to eq(new_primary_key)
expect_table_partitioned_by(partitioned_table, [partition_column])
end
it 'removes the default from the primary key column' do
model.partition_table_by_date template_table, partition_column, min_date: min_date, max_date: max_date
pk_column = connection.columns(partitioned_table).find { |c| c.name == old_primary_key }
expect(pk_column.default_function).to be_nil
end
it 'creates the partitioned table with the same non-key columns' do
model.partition_table_by_date template_table, partition_column, min_date: min_date, max_date: max_date
copied_columns = filter_columns_by_name(connection.columns(partitioned_table), new_primary_key)
original_columns = filter_columns_by_name(connection.columns(template_table), new_primary_key)
expect(copied_columns).to match_array(original_columns)
end
it 'creates a partition spanning over each month in the range given' do
model.partition_table_by_date template_table, partition_column, min_date: min_date, max_date: max_date
expect_range_partition_of("#{partitioned_table}_000000", partitioned_table, 'MINVALUE', "'2019-12-01 00:00:00'")
expect_range_partition_of("#{partitioned_table}_201912", partitioned_table, "'2019-12-01 00:00:00'", "'2020-01-01 00:00:00'")
expect_range_partition_of("#{partitioned_table}_202001", partitioned_table, "'2020-01-01 00:00:00'", "'2020-02-01 00:00:00'")
expect_range_partition_of("#{partitioned_table}_202002", partitioned_table, "'2020-02-01 00:00:00'", "'2020-03-01 00:00:00'")
end
end
end
describe '#drop_partitioned_table_for' do
let(:expected_tables) do
%w[000000 201912 202001 202002].map { |suffix| "#{partitioned_table}_#{suffix}" }.unshift(partitioned_table)
end
it 'drops the partitioned copy and all partitions' do
model.partition_table_by_date template_table, partition_column, min_date: min_date, max_date: max_date
expected_tables.each do |table|
expect(connection.table_exists?(table)).to be(true)
end
model.drop_partitioned_table_for template_table
expected_tables.each do |table|
expect(connection.table_exists?(table)).to be(false)
end
end
end
def filter_columns_by_name(columns, names)
columns.reject { |c| names.include?(c.name) }
end
end
# frozen_string_literal: true
module PartitioningHelpers
def expect_table_partitioned_by(table, columns, part_type: :range)
columns_with_part_type = columns.map { |c| [part_type.to_s, c] }
actual_columns = find_partitioned_columns(table)
expect(columns_with_part_type).to match_array(actual_columns)
end
def expect_range_partition_of(partition_name, table_name, min_value, max_value)
definition = find_partition_definition(partition_name)
expect(definition).not_to be_nil
expect(definition['base_table']).to eq(table_name.to_s)
expect(definition['condition']).to eq("FOR VALUES FROM (#{min_value}) TO (#{max_value})")
end
private
def find_partitioned_columns(table)
connection.select_rows(<<~SQL)
select
case partstrat
when 'l' then 'list'
when 'r' then 'range'
when 'h' then 'hash'
end as partstrat,
cols.column_name
from (
select partrelid, partstrat, unnest(partattrs) as col_pos
from pg_partitioned_table
) pg_part
inner join pg_class
on pg_part.partrelid = pg_class.oid
inner join information_schema.columns cols
on cols.table_name = pg_class.relname
and cols.ordinal_position = pg_part.col_pos
where pg_class.relname = '#{table}';
SQL
end
def find_partition_definition(partition)
connection.select_one(<<~SQL)
select
parent_class.relname as base_table,
pg_get_expr(pg_class.relpartbound, inhrelid) as condition
from pg_class
inner join pg_inherits i on pg_class.oid = inhrelid
inner join pg_class parent_class on parent_class.oid = inhparent
where pg_class.relname = '#{partition}' and pg_class.relispartition;
SQL
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment