Commit 074ee55e authored by Adam Hegyi's avatar Adam Hegyi

Merge branch '343547-fair-queueing-for-lfk' into 'master'

Implement fair queueing for LFK

See merge request gitlab-org/gitlab!79116
parents 467b5c6a 68410b2d
...@@ -46,17 +46,39 @@ class LooseForeignKeys::DeletedRecord < Gitlab::Database::SharedModel ...@@ -46,17 +46,39 @@ class LooseForeignKeys::DeletedRecord < Gitlab::Database::SharedModel
.to_a .to_a
end end
def self.mark_records_processed(all_records) def self.mark_records_processed(records)
# Run a query for each partition to optimize the row lookup by primary key (partition, id) update_by_partition(records) do |partitioned_scope|
partitioned_scope.update_all(status: :processed)
end
end
def self.reschedule(records, consume_after)
update_by_partition(records) do |partitioned_scope|
partitioned_scope.update_all(consume_after: consume_after, cleanup_attempts: 0)
end
end
def self.increment_attempts(records)
update_by_partition(records) do |partitioned_scope|
# Naive incrementing of the cleanup_attempts is good enough for us.
partitioned_scope.update_all('cleanup_attempts = cleanup_attempts + 1')
end
end
def self.update_by_partition(records)
update_count = 0 update_count = 0
all_records.group_by(&:partition_number).each do |partition, records_within_partition| # Run a query for each partition to optimize the row lookup by primary key (partition, id)
update_count += status_pending records.group_by(&:partition_number).each do |partition, records_within_partition|
partitioned_scope = status_pending
.for_partition(partition) .for_partition(partition)
.where(id: records_within_partition.pluck(:id)) .where(id: records_within_partition.pluck(:id))
.update_all(status: :processed)
update_count += yield(partitioned_scope)
end end
update_count update_count
end end
private_class_method :update_by_partition
end end
...@@ -2,6 +2,9 @@ ...@@ -2,6 +2,9 @@
module LooseForeignKeys module LooseForeignKeys
class BatchCleanerService class BatchCleanerService
CLEANUP_ATTEMPTS_BEFORE_RESCHEDULE = 3
CONSUME_AFTER_RESCHEDULE = 5.minutes
def initialize(parent_table:, loose_foreign_key_definitions:, deleted_parent_records:, modification_tracker: LooseForeignKeys::ModificationTracker.new) def initialize(parent_table:, loose_foreign_key_definitions:, deleted_parent_records:, modification_tracker: LooseForeignKeys::ModificationTracker.new)
@parent_table = parent_table @parent_table = parent_table
@loose_foreign_key_definitions = loose_foreign_key_definitions @loose_foreign_key_definitions = loose_foreign_key_definitions
...@@ -11,15 +14,31 @@ module LooseForeignKeys ...@@ -11,15 +14,31 @@ module LooseForeignKeys
:loose_foreign_key_processed_deleted_records, :loose_foreign_key_processed_deleted_records,
'The number of processed loose foreign key deleted records' 'The number of processed loose foreign key deleted records'
) )
@deleted_records_rescheduled_count = Gitlab::Metrics.counter(
:loose_foreign_key_rescheduled_deleted_records,
'The number of rescheduled loose foreign key deleted records'
)
@deleted_records_incremented_count = Gitlab::Metrics.counter(
:loose_foreign_key_incremented_deleted_records,
'The number of loose foreign key deleted records with incremented cleanup_attempts'
)
end end
def execute def execute
loose_foreign_key_definitions.each do |loose_foreign_key_definition| loose_foreign_key_definitions.each do |loose_foreign_key_definition|
run_cleaner_service(loose_foreign_key_definition, with_skip_locked: true) run_cleaner_service(loose_foreign_key_definition, with_skip_locked: true)
break if modification_tracker.over_limit?
if modification_tracker.over_limit?
handle_over_limit
break
end
run_cleaner_service(loose_foreign_key_definition, with_skip_locked: false) run_cleaner_service(loose_foreign_key_definition, with_skip_locked: false)
break if modification_tracker.over_limit?
if modification_tracker.over_limit?
handle_over_limit
break
end
end end
return if modification_tracker.over_limit? return if modification_tracker.over_limit?
...@@ -27,12 +46,33 @@ module LooseForeignKeys ...@@ -27,12 +46,33 @@ module LooseForeignKeys
# At this point, all associations are cleaned up, we can update the status of the parent records # At this point, all associations are cleaned up, we can update the status of the parent records
update_count = LooseForeignKeys::DeletedRecord.mark_records_processed(deleted_parent_records) update_count = LooseForeignKeys::DeletedRecord.mark_records_processed(deleted_parent_records)
deleted_records_counter.increment({ table: parent_table, db_config_name: LooseForeignKeys::DeletedRecord.connection.pool.db_config.name }, update_count) deleted_records_counter.increment({ table: parent_table, db_config_name: db_config_name }, update_count)
end end
private private
attr_reader :parent_table, :loose_foreign_key_definitions, :deleted_parent_records, :modification_tracker, :deleted_records_counter attr_reader :parent_table, :loose_foreign_key_definitions, :deleted_parent_records, :modification_tracker, :deleted_records_counter, :deleted_records_rescheduled_count, :deleted_records_incremented_count
def handle_over_limit
return if Feature.disabled?(:lfk_fair_queueing)
records_to_reschedule = []
records_to_increment = []
deleted_parent_records.each do |deleted_record|
if deleted_record.cleanup_attempts >= CLEANUP_ATTEMPTS_BEFORE_RESCHEDULE
records_to_reschedule << deleted_record
else
records_to_increment << deleted_record
end
end
reschedule_count = LooseForeignKeys::DeletedRecord.reschedule(records_to_reschedule, CONSUME_AFTER_RESCHEDULE.from_now)
deleted_records_rescheduled_count.increment({ table: parent_table, db_config_name: db_config_name }, reschedule_count)
increment_count = LooseForeignKeys::DeletedRecord.increment_attempts(records_to_increment)
deleted_records_incremented_count.increment({ table: parent_table, db_config_name: db_config_name }, increment_count)
end
def record_result(cleaner, result) def record_result(cleaner, result)
if cleaner.async_delete? if cleaner.async_delete?
...@@ -60,5 +100,9 @@ module LooseForeignKeys ...@@ -60,5 +100,9 @@ module LooseForeignKeys
end end
end end
end end
def db_config_name
LooseForeignKeys::DeletedRecord.connection.pool.db_config.name
end
end end
end end
---
name: lfk_fair_queueing
introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/79116
rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/351082
milestone: '14.8'
type: development
group: group::sharding
default_enabled: false
# frozen_string_literal: true
class AddCleanupAttemptsToLooseForeignKeysDeletedRecords < Gitlab::Database::Migration[1.0]
enable_lock_retries!
def up
add_column :loose_foreign_keys_deleted_records, :cleanup_attempts, :smallint, default: 0
end
def down
remove_column :loose_foreign_keys_deleted_records, :cleanup_attempts
end
end
c50a21430e52fc6ccbe7ab4aba99ae3a48d1c69858f7886da115f54e19fc27ea
\ No newline at end of file
...@@ -279,6 +279,7 @@ CREATE TABLE loose_foreign_keys_deleted_records ( ...@@ -279,6 +279,7 @@ CREATE TABLE loose_foreign_keys_deleted_records (
created_at timestamp with time zone DEFAULT now() NOT NULL, created_at timestamp with time zone DEFAULT now() NOT NULL,
fully_qualified_table_name text NOT NULL, fully_qualified_table_name text NOT NULL,
consume_after timestamp with time zone DEFAULT now(), consume_after timestamp with time zone DEFAULT now(),
cleanup_attempts smallint DEFAULT 0,
CONSTRAINT check_1a541f3235 CHECK ((char_length(fully_qualified_table_name) <= 150)) CONSTRAINT check_1a541f3235 CHECK ((char_length(fully_qualified_table_name) <= 150))
) )
PARTITION BY LIST (partition); PARTITION BY LIST (partition);
...@@ -6,15 +6,15 @@ RSpec.describe LooseForeignKeys::DeletedRecord, type: :model do ...@@ -6,15 +6,15 @@ RSpec.describe LooseForeignKeys::DeletedRecord, type: :model do
let_it_be(:table) { 'public.projects' } let_it_be(:table) { 'public.projects' }
describe 'class methods' do describe 'class methods' do
let_it_be(:deleted_record_1) { described_class.create!(fully_qualified_table_name: table, primary_key_value: 5) } let_it_be(:deleted_record_1) { described_class.create!(fully_qualified_table_name: table, primary_key_value: 5, cleanup_attempts: 2) }
let_it_be(:deleted_record_2) { described_class.create!(fully_qualified_table_name: table, primary_key_value: 1) } let_it_be(:deleted_record_2) { described_class.create!(fully_qualified_table_name: table, primary_key_value: 1, cleanup_attempts: 0) }
let_it_be(:deleted_record_3) { described_class.create!(fully_qualified_table_name: 'public.other_table', primary_key_value: 3) } let_it_be(:deleted_record_3) { described_class.create!(fully_qualified_table_name: 'public.other_table', primary_key_value: 3) }
let_it_be(:deleted_record_4) { described_class.create!(fully_qualified_table_name: table, primary_key_value: 1) } # duplicate let_it_be(:deleted_record_4) { described_class.create!(fully_qualified_table_name: table, primary_key_value: 1, cleanup_attempts: 1) } # duplicate
let(:records) { described_class.load_batch_for_table(table, 10) }
describe '.load_batch_for_table' do describe '.load_batch_for_table' do
it 'loads records and orders them by creation date' do it 'loads records and orders them by creation date' do
records = described_class.load_batch_for_table(table, 10)
expect(records).to eq([deleted_record_1, deleted_record_2, deleted_record_4]) expect(records).to eq([deleted_record_1, deleted_record_2, deleted_record_4])
end end
...@@ -27,13 +27,38 @@ RSpec.describe LooseForeignKeys::DeletedRecord, type: :model do ...@@ -27,13 +27,38 @@ RSpec.describe LooseForeignKeys::DeletedRecord, type: :model do
describe '.mark_records_processed' do describe '.mark_records_processed' do
it 'updates all records' do it 'updates all records' do
records = described_class.load_batch_for_table(table, 10)
described_class.mark_records_processed(records) described_class.mark_records_processed(records)
expect(described_class.status_pending.count).to eq(1) expect(described_class.status_pending.count).to eq(1)
expect(described_class.status_processed.count).to eq(3) expect(described_class.status_processed.count).to eq(3)
end end
end end
describe '.reschedule' do
it 'reschedules all records' do
time = Time.zone.parse('2022-01-01').utc
update_count = described_class.reschedule(records, time)
expect(update_count).to eq(records.size)
records.each(&:reload)
expect(records).to all(have_attributes(
cleanup_attempts: 0,
consume_after: time
))
end
end
describe '.increment_attempts' do
it 'increaments the cleanup_attempts column' do
described_class.increment_attempts(records)
expect(deleted_record_1.reload.cleanup_attempts).to eq(3)
expect(deleted_record_2.reload.cleanup_attempts).to eq(1)
expect(deleted_record_4.reload.cleanup_attempts).to eq(2)
end
end
end end
describe 'sliding_list partitioning' do describe 'sliding_list partitioning' do
......
...@@ -115,4 +115,82 @@ RSpec.describe LooseForeignKeys::BatchCleanerService do ...@@ -115,4 +115,82 @@ RSpec.describe LooseForeignKeys::BatchCleanerService do
expect(loose_fk_child_table_2.where(parent_id_with_different_column: other_parent_record.id).count).to eq(2) expect(loose_fk_child_table_2.where(parent_id_with_different_column: other_parent_record.id).count).to eq(2)
end end
end end
describe 'fair queueing' do
context 'when the execution is over the limit' do
let(:modification_tracker) { instance_double(LooseForeignKeys::ModificationTracker) }
let(:over_limit_return_values) { [true] }
let(:deleted_record) { LooseForeignKeys::DeletedRecord.load_batch_for_table('public._test_loose_fk_parent_table', 1).first }
let(:deleted_records_rescheduled_counter) { Gitlab::Metrics.registry.get(:loose_foreign_key_rescheduled_deleted_records) }
let(:deleted_records_incremented_counter) { Gitlab::Metrics.registry.get(:loose_foreign_key_incremented_deleted_records) }
let(:cleaner) do
described_class.new(parent_table: '_test_loose_fk_parent_table',
loose_foreign_key_definitions: loose_foreign_key_definitions,
deleted_parent_records: LooseForeignKeys::DeletedRecord.load_batch_for_table('public._test_loose_fk_parent_table', 100),
modification_tracker: modification_tracker
)
end
before do
parent_record_1.delete
allow(modification_tracker).to receive(:over_limit?).and_return(*over_limit_return_values)
allow(modification_tracker).to receive(:add_deletions)
end
context 'when the deleted record is under the maximum allowed cleanup attempts' do
it 'updates the cleanup_attempts column', :aggregate_failures do
deleted_record.update!(cleanup_attempts: 1)
cleaner.execute
expect(deleted_record.reload.cleanup_attempts).to eq(2)
expect(deleted_records_incremented_counter.get(table: loose_fk_parent_table.table_name, db_config_name: 'main')).to eq(1)
end
context 'when the deleted record is above the maximum allowed cleanup attempts' do
it 'reschedules the record', :aggregate_failures do
deleted_record.update!(cleanup_attempts: LooseForeignKeys::BatchCleanerService::CLEANUP_ATTEMPTS_BEFORE_RESCHEDULE + 1)
freeze_time do
cleaner.execute
expect(deleted_record.reload).to have_attributes(
cleanup_attempts: 0,
consume_after: 5.minutes.from_now
)
expect(deleted_records_rescheduled_counter.get(table: loose_fk_parent_table.table_name, db_config_name: 'main')).to eq(1)
end
end
end
describe 'when over limit happens on the second cleanup call without skip locked' do
# over_limit? is called twice, we test here the 2nd call
# - When invoking cleanup with SKIP LOCKED
# - When invoking cleanup (no SKIP LOCKED)
let(:over_limit_return_values) { [false, true] }
it 'updates the cleanup_attempts column' do
expect(cleaner).to receive(:run_cleaner_service).twice
deleted_record.update!(cleanup_attempts: 1)
cleaner.execute
expect(deleted_record.reload.cleanup_attempts).to eq(2)
end
end
end
context 'when the lfk_fair_queueing FF is off' do
before do
stub_feature_flags(lfk_fair_queueing: false)
end
it 'does nothing' do
expect { cleaner.execute }.not_to change { deleted_record.reload.cleanup_attempts }
end
end
end
end
end end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment