Commit bbefa0f1 authored by Mike Kozono's avatar Mike Kozono

Continuously backfill checksums on the primary

- Triggered by Geo::VerificationCronWorker every minute
- The work is done by Geo::VerificationBatchWorker
- Reuses LimitedCapacity::Worker concern
- Concurrent workers do not pick up the same records
- Fixes a bug in LimitedCapacity::Worker so it passes the job args
- To do: Killed jobs can cause stuck "started" records. A follow up MR
  is in progress.
parent 6a71676d
......@@ -73,7 +73,7 @@ module LimitedCapacity
raise
ensure
job_tracker.remove(jid)
report_prometheus_metrics
report_prometheus_metrics(*args)
re_enqueue(*args) unless exception
end
......
......@@ -26,6 +26,14 @@ module Geo::ReplicableRegistry
def registry_consistency_worker_enabled?
replicator_class.enabled?
end
def verification_pending_batch(batch_size:)
[] # TODO: https://gitlab.com/gitlab-org/gitlab/-/issues/13981
end
def verification_failed_batch(batch_size:)
[] # TODO: https://gitlab.com/gitlab-org/gitlab/-/issues/13981
end
end
def replicator_class
......
......@@ -6,9 +6,13 @@ module Geo
include Delay
DEFAULT_VERIFICATION_BATCH_SIZE = 10
class_methods do
extend Gitlab::Utils::Override
delegate :verification_pending_batch, :verification_failed_batch, :needs_verification_count, to: :verification_query_class
# If replication is disabled, then so is verification.
override :verification_enabled?
def verification_enabled?
......@@ -32,7 +36,59 @@ module Geo
def trigger_background_verification
return false unless verification_enabled?
# TODO: ::Geo::VerificationBatchWorker.perform_with_capacity(self)
::Geo::VerificationBatchWorker.perform_with_capacity(replicable_name)
end
# Called by VerificationBatchWorker.
#
# - Gets next batch of records that need to be verified
# - Verifies them
#
def verify_batch
self.replicator_batch_to_verify.each(&:verify)
end
# Called by VerificationBatchWorker.
#
# - Asks the DB how many things still need to be verified (with a limit)
# - Converts that to a number of batches
#
# @return [Integer] number of batches of verification work remaining, up to the given maximum
def remaining_verification_batch_count(max_batch_count:)
needs_verification_count(limit: max_batch_count * verification_batch_size)
.fdiv(verification_batch_size)
.ceil
end
# @return [Array<Gitlab::Geo::Replicator>] batch of replicators which need to be verified
def replicator_batch_to_verify
model_record_id_batch_to_verify.map do |id|
self.new(model_record_id: id)
end
end
# @return [Array<Integer>] list of IDs for this replicator's model which need to be verified
def model_record_id_batch_to_verify
ids = verification_pending_batch(batch_size: verification_batch_size)
remaining_batch_size = verification_batch_size - ids.size
if remaining_batch_size > 0
ids += verification_failed_batch(batch_size: remaining_batch_size)
end
ids
end
# If primary, query the model table.
# If secondary, query the registry table.
def verification_query_class
Gitlab::Geo.secondary? ? registry_class : model
end
# @return [Integer] number of records to verify per batch job
def verification_batch_size
DEFAULT_VERIFICATION_BATCH_SIZE
end
def checksummed_count
......
......@@ -523,6 +523,14 @@
:weight: 1
:idempotent: true
:tags: []
- :name: geo:geo_verification_batch
:feature_category: :geo_replication
:has_external_dependencies:
:urgency: :low
:resource_boundary: :unknown
:weight: 1
:idempotent: true
:tags: []
- :name: personal_access_tokens:personal_access_tokens_groups_policy
:feature_category: :authentication_and_authorization
:has_external_dependencies:
......
......@@ -33,7 +33,7 @@ module Geo
end
def max_capacity
current_node.verification_max_capacity
Gitlab::Geo.verification_max_capacity_per_replicator_class
end
def schedule_job(project_id)
......
......@@ -36,7 +36,7 @@ module Geo
end
def max_capacity
current_node.verification_max_capacity
Gitlab::Geo.verification_max_capacity_per_replicator_class
end
# rubocop:disable CodeReuse/ActiveRecord
......
# frozen_string_literal: true
module Geo
class VerificationBatchWorker
include ApplicationWorker
include GeoQueue
include LimitedCapacity::Worker
include ::Gitlab::Geo::LogHelpers
idempotent!
loggable_arguments 0
def perform_work(replicable_name)
replicator_class = replicator_class_for(replicable_name)
replicator_class.verify_batch
end
# This method helps answer the questions:
#
# - Should this worker be reenqueued after it finishes its batch?
# - How many workers should the parent cron worker start?
#
def remaining_work_count(replicable_name)
replicator_class = replicator_class_for(replicable_name)
@remaining_work_count ||= replicator_class
.remaining_verification_batch_count(max_batch_count: remaining_capacity)
end
def replicator_class_for(replicable_name)
@replicator_class ||= ::Gitlab::Geo::Replicator.for_replicable_name(replicable_name)
end
def max_running_jobs
Gitlab::Geo.verification_max_capacity_per_replicator_class
end
end
end
......@@ -175,5 +175,34 @@ module Gitlab
def self.verification_enabled_replicator_classes
REPLICATOR_CLASSES.select(&:verification_enabled?)
end
# Returns the maximum number of concurrent verification jobs per Replicator
# class.
#
# On the primary:
#
# - Geo::VerificationBatchWorker will run up to this many instances of
# itself, for each Replicator class with verification enabled.
# - Geo::RepositoryVerification::Primary::ShardWorker will run up to this
# many concurrent Geo::RepositoryVerification::Primary::SingleWorker
# jobs.
#
# On each secondary:
#
# - Geo::VerificationBatchWorker will run up to this many instances of
# itself, for each Replicator class with verification enabled.
# - Geo::RepositoryVerification::Secondary::ShardWorker will run up to this
# many concurrent Geo::RepositoryVerification::Secondary::SingleWorker
# jobs.
#
# @return [Integer] the maximum number of concurrent verification jobs per Replicator class
def self.verification_max_capacity_per_replicator_class
num_legacy_verification_schedulers = 1 # it handles both Projects and Wikis
num_verifiable_replicator_classes = verification_enabled_replicator_classes.size + num_legacy_verification_schedulers
capacity = current_node.verification_max_capacity / num_verifiable_replicator_classes
[1, capacity].max # at least 1
end
end
end
......@@ -33,8 +33,6 @@ module Gitlab
scope :verification_failed, -> { with_verification_state(:verification_failed) }
scope :checksummed, -> { where.not(verification_checksum: nil) }
scope :not_checksummed, -> { where(verification_checksum: nil) }
scope :never_attempted_verification, -> { verification_pending.where(verification_started_at: nil) }
scope :needs_verification_again, -> { verification_pending.where.not(verification_started_at: nil).or(verification_failed) }
scope :verification_timed_out, -> { verification_started.where("verification_started_at < ?", VERIFICATION_TIMEOUT.ago) }
scope :needs_verification, -> { verification_pending.or(verification_failed) }
# rubocop:enable CodeReuse/ActiveRecord
......@@ -96,6 +94,68 @@ module Gitlab
def verification_state_value(state_string)
VERIFICATION_STATE_VALUES[state_string]
end
# Returns IDs of records that are pending verification.
#
# Atomically marks those records "verification_started" in the same DB
# query.
#
def verification_pending_batch(batch_size:)
relation = verification_pending.order(Gitlab::Database.nulls_first_order(:verified_at)).limit(batch_size) # rubocop:disable CodeReuse/ActiveRecord
start_verification_batch(relation)
end
# Returns IDs of records that failed to verify (calculate and save checksum).
#
# Atomically marks those records "verification_started" in the same DB
# query.
#
def verification_failed_batch(batch_size:)
relation = verification_failed.order(Gitlab::Database.nulls_first_order(:verification_retry_at)).limit(batch_size) # rubocop:disable CodeReuse/ActiveRecord
start_verification_batch(relation)
end
# @return [Integer] number of records that need verification
def needs_verification_count(limit:)
needs_verification.limit(limit).count # rubocop:disable CodeReuse/ActiveRecord
end
# Atomically marks the records as verification_started, with a
# verification_started_at time, and returns the primary key of each
# updated row. This allows VerificationBatchWorker to concurrently get
# unique batches of primary keys to process.
#
# @param [ActiveRecord::Relation] relation with appropriate where, order, and limit defined
# @return [Array<Integer>] primary key of each updated row
def start_verification_batch(relation)
query = start_verification_batch_query(relation)
# This query performs a write, so we need to wrap it in a transaction
# to stick to the primary database.
self.transaction do
self.connection.execute(query).to_a.map { |row| row[self.primary_key] }
end
end
# Returns a SQL statement which would update all the rows in the
# relation as verification_started, with a verification_started_at time,
# and returns the primary key of each updated row.
#
# @param [ActiveRecord::Relation] relation with appropriate where, order, and limit defined
# @return [String] SQL statement which would update all and return primary key of each row
def start_verification_batch_query(relation)
started_enum_value = VERIFICATION_STATE_VALUES[:verification_started]
<<~SQL.squish
UPDATE #{table_name}
SET "verification_state" = #{started_enum_value},
"verification_started_at" = NOW()
WHERE #{self.primary_key} IN (#{relation.select(self.primary_key).to_sql})
RETURNING #{self.primary_key}
SQL
end
end
# Convenience method to update checksum and transition to success state.
......
......@@ -19,14 +19,123 @@ RSpec.describe Gitlab::Geo::VerificationState do
before do
stub_dummy_replicator_class
stub_dummy_model_class
subject.verification_started
subject.save!
end
subject { DummyModel.new }
describe '.verification_pending_batch' do
# Insert 2 records for a total of 3 with subject
let!(:other_pending_records) do
DummyModel.insert_all([
{ verification_state: pending_value, verified_at: 7.days.ago },
{ verification_state: pending_value, verified_at: 6.days.ago }
], returning: [:id])
end
let(:pending_value) { DummyModel.verification_state_value(:verification_pending) }
let(:other_pending_ids) { other_pending_records.map { |result| result['id'] } }
before do
subject.save!
end
it 'returns IDs of rows pending verification' do
expect(subject.class.verification_pending_batch(batch_size: 3)).to include(subject.id)
end
it 'marks verification as started' do
subject.class.verification_pending_batch(batch_size: 3)
expect(subject.reload.verification_started?).to be_truthy
expect(subject.verification_started_at).to be_present
end
it 'limits with batch_size and orders records by verified_at with NULLs first' do
expected = [subject.id, other_pending_ids.first]
# `match_array` instead of `eq` because the UPDATE query does not
# guarantee that results are returned in the same order as the subquery
# used to SELECT the correct batch.
expect(subject.class.verification_pending_batch(batch_size: 2)).to match_array(expected)
end
context 'other verification states' do
it 'does not include them' do
subject.verification_started!
expect(subject.class.verification_pending_batch(batch_size: 3)).not_to include(subject.id)
subject.verification_succeeded_with_checksum!('foo', Time.current)
expect(subject.class.verification_pending_batch(batch_size: 3)).not_to include(subject.id)
subject.verification_started
subject.verification_failed_with_message!('foo')
expect(subject.class.verification_pending_batch(batch_size: 3)).not_to include(subject.id)
end
end
end
describe '.verification_failed_batch' do
# Insert 2 records for a total of 3 with subject
let!(:other_failed_records) do
DummyModel.insert_all([
{ verification_state: failed_value, verification_retry_at: 7.days.ago },
{ verification_state: failed_value, verification_retry_at: 6.days.ago }
], returning: [:id])
end
let(:failed_value) { DummyModel.verification_state_value(:verification_failed) }
let(:other_failed_ids) { other_failed_records.map { |result| result['id'] } }
before do
subject.verification_started!
subject.verification_failed_with_message!('foo')
end
it 'returns IDs of rows pending verification' do
expect(subject.class.verification_failed_batch(batch_size: 3)).to include(subject.id)
end
it 'marks verification as started' do
subject.class.verification_failed_batch(batch_size: 3)
expect(subject.reload.verification_started?).to be_truthy
expect(subject.verification_started_at).to be_present
end
it 'limits with batch_size and orders records by verification_retry_at with NULLs first' do
expected = other_failed_ids
# `match_array` instead of `eq` because the UPDATE query does not
# guarantee that results are returned in the same order as the subquery
# used to SELECT the correct batch.
expect(subject.class.verification_failed_batch(batch_size: 2)).to match_array(expected)
end
context 'other verification states' do
it 'does not include them' do
subject.verification_started!
expect(subject.class.verification_failed_batch(batch_size: 5)).not_to include(subject.id)
subject.verification_succeeded_with_checksum!('foo', Time.current)
expect(subject.class.verification_failed_batch(batch_size: 5)).not_to include(subject.id)
subject.verification_pending!
expect(subject.class.verification_failed_batch(batch_size: 5)).not_to include(subject.id)
end
end
end
describe '#verification_succeeded_with_checksum!' do
before do
subject.verification_started!
end
context 'when the resource was updated during checksum calculation' do
let(:calculation_started_at) { subject.verification_started_at - 1.second }
......@@ -54,6 +163,7 @@ RSpec.describe Gitlab::Geo::VerificationState do
it 'saves the error message and increments retry counter' do
error = double('error', message: 'An error message')
subject.verification_started!
subject.verification_failed_with_message!('Failure to calculate checksum', error)
expect(subject.reload.verification_failed?).to be_truthy
......
......@@ -395,4 +395,38 @@ RSpec.describe Gitlab::Geo, :geo, :request_store do
end
end
end
describe '.verification_max_capacity_per_replicator_class' do
let(:verification_max_capacity) { 12 }
let(:node) { double('node', verification_max_capacity: verification_max_capacity) }
before do
stub_current_geo_node(node)
end
context 'when there are no Replicator classes with verification enabled' do
it 'returns the total capacity' do
stub_feature_flags(geo_package_file_verification: false)
stub_feature_flags(geo_framework_verification: false)
expect(described_class.verification_max_capacity_per_replicator_class).to eq(verification_max_capacity)
end
end
context 'when there is 1 Replicator class with verification enabled' do
it 'returns half capacity' do
stub_feature_flags(geo_framework_verification: false)
expect(described_class.verification_max_capacity_per_replicator_class).to eq(verification_max_capacity / 2)
end
end
context 'when total capacity is set lower than the number of Replicators' do
let(:verification_max_capacity) { 1 }
it 'returns 1' do
expect(described_class.verification_max_capacity_per_replicator_class).to eq(1)
end
end
end
end
......@@ -2,6 +2,13 @@
# This should be included on any Replicator which implements verification.
#
# Expected let variables:
#
# - primary
# - secondary
# - model_record
# - replicator
#
RSpec.shared_examples 'a verifiable replicator' do
include EE::GeoHelpers
......@@ -113,6 +120,151 @@ RSpec.shared_examples 'a verifiable replicator' do
end
end
describe '.trigger_background_verification' do
context 'when verification is enabled' do
before do
allow(described_class).to receive(:verification_enabled?).and_return(true)
end
it 'enqueues VerificationBatchWorker' do
expect(::Geo::VerificationBatchWorker).to receive(:perform_with_capacity).with(described_class.replicable_name)
described_class.trigger_background_verification
end
end
context 'when verification is disabled' do
before do
allow(described_class).to receive(:verification_enabled?).and_return(false)
end
it 'does not enqueue VerificationBatchWorker' do
expect(::Geo::VerificationBatchWorker).not_to receive(:perform_with_capacity)
described_class.trigger_background_verification
end
end
end
describe '.verify_batch' do
context 'when there are records needing verification' do
let(:another_replicator) { double('another_replicator', verify: true) }
let(:replicators) { [replicator, another_replicator] }
before do
allow(described_class).to receive(:replicator_batch_to_verify).and_return(replicators)
end
it 'calls #verify on each replicator' do
expect(replicator).to receive(:verify)
expect(another_replicator).to receive(:verify)
described_class.verify_batch
end
end
end
describe '.remaining_verification_batch_count' do
it 'converts needs_verification_count to number of batches' do
expected_limit = 40
expect(described_class).to receive(:needs_verification_count).with(limit: expected_limit).and_return(21)
expect(described_class.remaining_verification_batch_count(max_batch_count: 4)).to eq(3)
end
end
describe '.replicator_batch_to_verify' do
it 'returns usable Replicator instances' do
model_record.save!
expect(described_class).to receive(:model_record_id_batch_to_verify).and_return([model_record.id])
first_result = described_class.replicator_batch_to_verify.first
expect(first_result.class).to eq(described_class)
expect(first_result.model_record_id).to eq(model_record.id)
end
end
describe '.model_record_id_batch_to_verify' do
let(:pending_ids) { [1, 2] }
before do
allow(described_class).to receive(:verification_batch_size).and_return(verification_batch_size)
allow(described_class).to receive(:verification_pending_batch).with(batch_size: verification_batch_size).and_return(pending_ids)
end
context 'when the batch is filled by pending rows' do
let(:verification_batch_size) { 2 }
it 'returns IDs of pending rows' do
expect(described_class.model_record_id_batch_to_verify).to eq(pending_ids)
end
it 'does not call .verification_failed_batch' do
expect(described_class).not_to receive(:verification_failed_batch)
described_class.model_record_id_batch_to_verify
end
end
context 'when that batch is not filled by pending rows' do
let(:failed_ids) { [3, 4, 5] }
let(:verification_batch_size) { 5 }
it 'includes IDs of failed rows' do
remaining_capacity = verification_batch_size - pending_ids.size
allow(described_class).to receive(:verification_failed_batch).with(batch_size: remaining_capacity).and_return(failed_ids)
result = described_class.model_record_id_batch_to_verify
expect(result).to include(*pending_ids)
expect(result).to include(*failed_ids)
end
end
end
describe '.verification_pending_batch' do
context 'when current node is a primary' do
it 'delegates to the model class of the replicator' do
expect(described_class.model).to receive(:verification_pending_batch)
described_class.verification_pending_batch
end
end
context 'when current node is a secondary' do
it 'delegates to the registry class of the replicator' do
stub_current_geo_node(secondary)
expect(described_class.registry_class).to receive(:verification_pending_batch)
described_class.verification_pending_batch
end
end
end
describe '.verification_failed_batch' do
context 'when current node is a primary' do
it 'delegates to the model class of the replicator' do
expect(described_class.model).to receive(:verification_failed_batch)
described_class.verification_failed_batch
end
end
context 'when current node is a secondary' do
it 'delegates to the registry class of the replicator' do
stub_current_geo_node(secondary)
expect(described_class.registry_class).to receive(:verification_failed_batch)
described_class.verification_failed_batch
end
end
end
describe '#after_verifiable_update' do
it 'calls verify_async if needed' do
expect(replicator).to receive(:verify_async)
......
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Geo::VerificationBatchWorker, :geo do
include EE::GeoHelpers
let(:replicable_name) { 'widget' }
let(:replicator_class) { double('widget_replicator_class') }
let(:node) { double('node') }
before do
stub_current_geo_node(node)
end
subject(:job) { described_class.new }
it 'uses a Geo queue' do
expect(job.sidekiq_options_hash).to include(
'queue' => 'geo:geo_verification_batch',
'queue_namespace' => :geo
)
end
describe '#perform' do
it 'calls verify_batch' do
allow(::Gitlab::Geo::Replicator).to receive(:for_replicable_name).with(replicable_name).and_return(replicator_class)
allow(::Gitlab::Geo).to receive(:verification_max_capacity_per_replicator_class).and_return(1)
allow(replicator_class).to receive(:remaining_verification_batch_count).and_return(1)
expect(replicator_class).to receive(:verify_batch)
job.perform(replicable_name)
end
end
describe '#remaining_work_count' do
it 'returns remaining_verification_batch_count' do
expected = 7
args = { max_batch_count: 95 }
allow(job).to receive(:remaining_capacity).and_return(args[:max_batch_count])
allow(::Gitlab::Geo::Replicator).to receive(:for_replicable_name).with(replicable_name).and_return(replicator_class)
expect(replicator_class).to receive(:remaining_verification_batch_count).with(args).and_return(expected)
expect(job.remaining_work_count(replicable_name)).to eq(expected)
end
end
describe '#max_running_jobs' do
it 'returns verification_max_capacity_per_replicator_class' do
allow(::Gitlab::Geo).to receive(:verification_max_capacity_per_replicator_class).and_return(123)
expect(job.max_running_jobs).to eq(123)
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment