Commit 5f35ea14 authored by Andreas Brandl's avatar Andreas Brandl

Fix concurrency issue with migration for user_interacted_projects table.

The concurrency issue originates from inserts on
`user_interacted_projects` from the app while running the post-deploy
migration.

This change comes with a strategy to lock the table while removing
duplicates and creating the unique index (and similar for FK
constraints).

Also, we'll have a non-unique index until the post-deploy migration is
finished to speed up queries during that time.

Closes #44205.
parent 7e4fcbf9
......@@ -3,13 +3,15 @@ class CreateUserInteractedProjectsTable < ActiveRecord::Migration
DOWNTIME = false
disable_ddl_transaction!
INDEX_NAME = 'user_interacted_projects_non_unique_index'
def up
create_table :user_interacted_projects, id: false do |t|
t.references :user, null: false
t.references :project, null: false
end
add_index :user_interacted_projects, [:project_id, :user_id], name: INDEX_NAME
end
def down
......
require_relative '../migrate/20180223120443_create_user_interacted_projects_table.rb'
# rubocop:disable AddIndex
# rubocop:disable AddConcurrentForeignKey
class BuildUserInteractedProjectsTable < ActiveRecord::Migration
include Gitlab::Database::MigrationHelpers
# Set this constant to true if this migration requires downtime.
DOWNTIME = false
UNIQUE_INDEX_NAME = 'index_user_interacted_projects_on_project_id_and_user_id'
disable_ddl_transaction!
def up
......@@ -13,16 +18,8 @@ class BuildUserInteractedProjectsTable < ActiveRecord::Migration
MysqlStrategy.new
end.up
unless index_exists?(:user_interacted_projects, [:project_id, :user_id])
add_concurrent_index :user_interacted_projects, [:project_id, :user_id], unique: true
end
unless foreign_key_exists?(:user_interacted_projects, :user_id)
add_concurrent_foreign_key :user_interacted_projects, :users, column: :user_id, on_delete: :cascade
end
unless foreign_key_exists?(:user_interacted_projects, :project_id)
add_concurrent_foreign_key :user_interacted_projects, :projects, column: :project_id, on_delete: :cascade
if index_exists_by_name?(:user_interacted_projects, CreateUserInteractedProjectsTable::INDEX_NAME)
remove_concurrent_index_by_name :user_interacted_projects, CreateUserInteractedProjectsTable::INDEX_NAME
end
end
......@@ -37,31 +34,16 @@ class BuildUserInteractedProjectsTable < ActiveRecord::Migration
remove_foreign_key :user_interacted_projects, :projects
end
if index_exists_by_name?(:user_interacted_projects, 'index_user_interacted_projects_on_project_id_and_user_id')
remove_concurrent_index_by_name :user_interacted_projects, 'index_user_interacted_projects_on_project_id_and_user_id'
if index_exists_by_name?(:user_interacted_projects, UNIQUE_INDEX_NAME)
remove_concurrent_index_by_name :user_interacted_projects, UNIQUE_INDEX_NAME
end
end
private
# Rails' index_exists? doesn't work when you only give it a table and index
# name. As such we have to use some extra code to check if an index exists for
# a given name.
def index_exists_by_name?(table, index)
indexes_for_table[table].include?(index)
end
def indexes_for_table
@indexes_for_table ||= Hash.new do |hash, table_name|
hash[table_name] = indexes(table_name).map(&:name)
unless index_exists_by_name?(:user_interacted_projects, CreateUserInteractedProjectsTable::INDEX_NAME)
add_concurrent_index :user_interacted_projects, [:project_id, :user_id], name: CreateUserInteractedProjectsTable::INDEX_NAME
end
end
def foreign_key_exists?(table, column)
foreign_keys(table).any? do |key|
key.options[:column] == column.to_s
end
end
private
class PostgresStrategy < ActiveRecord::Migration
include Gitlab::Database::MigrationHelpers
......@@ -71,33 +53,86 @@ class BuildUserInteractedProjectsTable < ActiveRecord::Migration
def up
with_index(:events, [:author_id, :project_id], name: 'events_user_interactions_temp', where: 'project_id IS NOT NULL') do
iteration = 0
records = 0
begin
Rails.logger.info "Building user_interacted_projects table, batch ##{iteration}"
result = execute <<~SQL
insert_missing_records
# Do this once without lock to speed up the second invocation
remove_duplicates
with_table_lock(:user_interacted_projects) do
remove_duplicates
create_unique_index
end
remove_without_project
with_table_lock(:user_interacted_projects, :projects) do
remove_without_project
create_fk :user_interacted_projects, :projects, :project_id
end
remove_without_user
with_table_lock(:user_interacted_projects, :users) do
remove_without_user
create_fk :user_interacted_projects, :users, :user_id
end
end
execute "ANALYZE user_interacted_projects"
end
private
def insert_missing_records
iteration = 0
records = 0
begin
Rails.logger.info "Building user_interacted_projects table, batch ##{iteration}"
result = execute <<~SQL
INSERT INTO user_interacted_projects (user_id, project_id)
SELECT e.user_id, e.project_id
FROM (SELECT DISTINCT author_id AS user_id, project_id FROM events WHERE project_id IS NOT NULL) AS e
LEFT JOIN user_interacted_projects ucp USING (user_id, project_id)
WHERE ucp.user_id IS NULL
LIMIT #{BATCH_SIZE}
SQL
iteration += 1
records += result.cmd_tuples
Rails.logger.info "Building user_interacted_projects table, batch ##{iteration} complete, created #{records} overall"
Kernel.sleep(SLEEP_TIME) if result.cmd_tuples > 0
rescue ActiveRecord::InvalidForeignKey => e
Rails.logger.info "Retry on InvalidForeignKey: #{e}"
retry
end while result.cmd_tuples > 0
end
SQL
iteration += 1
records += result.cmd_tuples
Rails.logger.info "Building user_interacted_projects table, batch ##{iteration} complete, created #{records} overall"
Kernel.sleep(SLEEP_TIME) if result.cmd_tuples > 0
end while result.cmd_tuples > 0
end
execute "ANALYZE user_interacted_projects"
def remove_duplicates
execute <<~SQL
WITH numbered AS (select ctid, ROW_NUMBER() OVER (PARTITION BY (user_id, project_id)) as row_number, user_id, project_id from user_interacted_projects)
DELETE FROM user_interacted_projects WHERE ctid IN (SELECT ctid FROM numbered WHERE row_number > 1);
SQL
end
def remove_without_project
execute "DELETE FROM user_interacted_projects WHERE NOT EXISTS (SELECT 1 FROM projects WHERE id = user_interacted_projects.project_id)"
end
private
def remove_without_user
execute "DELETE FROM user_interacted_projects WHERE NOT EXISTS (SELECT 1 FROM users WHERE id = user_interacted_projects.user_id)"
end
def create_fk(table, target, column)
return if foreign_key_exists?(table, column)
add_foreign_key table, target, column: column, on_delete: :cascade
end
def create_unique_index
return if index_exists_by_name?(:user_interacted_projects, UNIQUE_INDEX_NAME)
add_index :user_interacted_projects, [:project_id, :user_id], unique: true, name: UNIQUE_INDEX_NAME
end
# Protect table against concurrent data changes while still allowing reads
def with_table_lock(*tables)
ActiveRecord::Base.connection.transaction do
execute "LOCK TABLE #{tables.join(", ")} IN SHARE MODE"
yield
end
end
def with_index(*args)
add_concurrent_index(*args) unless index_exists?(*args)
......@@ -118,7 +153,18 @@ class BuildUserInteractedProjectsTable < ActiveRecord::Migration
LEFT JOIN user_interacted_projects ucp USING (user_id, project_id)
WHERE ucp.user_id IS NULL
SQL
unless index_exists?(:user_interacted_projects, [:project_id, :user_id])
add_concurrent_index :user_interacted_projects, [:project_id, :user_id], unique: true, name: UNIQUE_INDEX_NAME
end
unless foreign_key_exists?(:user_interacted_projects, :user_id)
add_concurrent_foreign_key :user_interacted_projects, :users, column: :user_id, on_delete: :cascade
end
unless foreign_key_exists?(:user_interacted_projects, :project_id)
add_concurrent_foreign_key :user_interacted_projects, :projects, column: :project_id, on_delete: :cascade
end
end
end
end
......@@ -859,6 +859,19 @@ into similar problems in the future (e.g. when new tables are created).
BackgroundMigrationWorker.perform_in(delay_interval * index, job_class_name, [start_id, end_id])
end
end
def foreign_key_exists?(table, column)
foreign_keys(table).any? do |key|
key.options[:column] == column.to_s
end
end
# Rails' index_exists? doesn't work when you only give it a table and index
# name. As such we have to use some extra code to check if an index exists for
# a given name.
def index_exists_by_name?(table, index)
indexes(table).map(&:name).include?(index)
end
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment