Commit a7822ccd authored by Gabriel Mazetto's avatar Gabriel Mazetto

Merge branch 'ag-verifiables-update-worker' into 'master'

[Geo] Add a worker to backfill verification state

See merge request gitlab-org/gitlab!69301
parents 7886d7db aee40a34
......@@ -8,6 +8,7 @@ module Geo
DEFAULT_VERIFICATION_BATCH_SIZE = 10
DEFAULT_REVERIFICATION_BATCH_SIZE = 1000
DEFAULT_VERIFICATION_STATE_BACKFILL_BATCH_SIZE = 10000
included do
event :checksum_succeeded
......@@ -46,7 +47,13 @@ module Geo
# Secondaries don't need to run this since they will receive an event for each
# rechecksummed resource: https://gitlab.com/gitlab-org/gitlab/-/issues/13842
::Geo::ReverificationBatchWorker.perform_with_capacity(replicable_name) if ::Gitlab::Geo.primary?
return unless ::Gitlab::Geo.primary?
::Geo::ReverificationBatchWorker.perform_with_capacity(replicable_name)
if Feature.enabled?(:verification_state_backfill_worker, default_enabled: :yaml) && verification_query_class.separate_verification_state_table?
::Geo::VerificationStateBackfillWorker.perform_async(replicable_name)
end
end
# Called by VerificationBatchWorker.
......@@ -107,6 +114,20 @@ module Geo
reverify_batch(batch_size: reverification_batch_size)
end
# Gets the next batch of rows from the replicable table, and inserts and
# deletes corresponding rows in the verification state table.
#
# @return [Boolean] whether any rows needed to be inserted or deleted
def backfill_verification_state_table
return false unless Gitlab::Geo.primary?
Geo::VerificationStateBackfillService.new(model, batch_size: verification_state_backfill_batch_size).execute
rescue StandardError => e
log_error("Error while updating verifiables", e)
raise
end
# If primary, query the model table.
# If secondary, query the registry table.
def verification_query_class
......@@ -123,6 +144,11 @@ module Geo
DEFAULT_REVERIFICATION_BATCH_SIZE
end
# @return [Integer] number of records to check for backfill per batch job
def verification_state_backfill_batch_size
DEFAULT_VERIFICATION_STATE_BACKFILL_BATCH_SIZE
end
def checksummed_count
# When verification is disabled, this returns nil.
# Bonus: This causes the progress bar to be hidden.
......
......@@ -30,6 +30,10 @@ module EE
scope :with_verification_state, ->(state) { joins(:merge_request_diff_detail).where(merge_request_diff_details: { verification_state: verification_state_value(state) }) }
scope :checksummed, -> { joins(:merge_request_diff_detail).where.not(merge_request_diff_details: { verification_checksum: nil } ) }
scope :not_checksummed, -> { joins(:merge_request_diff_detail).where(merge_request_diff_details: { verification_checksum: nil } ) }
def create_verification_details
create_merge_request_diff_detail
end
end
class_methods do
......@@ -45,19 +49,9 @@ module EE
.merge(object_storage_scope(node))
end
override :verification_state_table_name
def verification_state_table_name
'merge_request_diff_details'
end
override :verification_state_model_key
def verification_state_model_key
'merge_request_diff_id'
end
override :verification_arel_table
def verification_arel_table
MergeRequestDiffDetail.arel_table
override :verification_state_table_class
def verification_state_table_class
MergeRequestDiffDetail
end
private
......
# frozen_string_literal: true
module Geo
class VerificationStateBackfillService
include ::Gitlab::Geo::LogHelpers
attr_reader :replicable_model, :batch_size
delegate :primary_key, :verification_state_table_name, :verification_state_model_key, :verification_arel_table, :verification_state_table_class, to: :replicable_model
def initialize(replicable_model, batch_size:)
@replicable_model = replicable_model
@batch_size = batch_size
end
# Gets the next batch of rows from the replicable table, and inserts and
# deletes corresponding rows in the verification state table.
#
# @return [Boolean] whether any rows needed to be inserted or deleted
def execute
range = next_range!
return unless range
handle_differences_in_verifiables(range)
rescue StandardError => e
log_error("Error while updating #{verification_state_table_name}", e)
raise
end
private
# @return [Range] the next range of a batch of records
def next_range!
Gitlab::Geo::BaseBatcher.new(replicable_model, verification_state_table_class, verification_state_model_key, key: batcher_key, batch_size: batch_size).next_range!
end
def batcher_key
"verification_backfill:#{replicable_model.name.parameterize}"
end
# This method creates or deletes verification details records.
# It creates for available verifiables where records don't exist yet.
# These would be replicable records that have recently become scoped as
# available_verifiables.
# New replicable records will automatically create child records in the
# verification details table, hence not created in this method.
# When a replicable record is no longer a part of the scope
# available_veriables, it is deleted.
# When a replicable record is deleted, the child record in the verification
# details table is automatically removed, hence not deleted in this method.
#
# @return [Boolean] whether any rows needed to be inserted or deleted
def handle_differences_in_verifiables(range)
verifiable_ids = replicable_model.pluck_verifiable_ids_in_range(range) || []
verification_details_ids = replicable_model.pluck_verification_details_ids_in_range(range) || []
for_creation_ids = verifiable_ids - verification_details_ids
for_deletion_ids = verification_details_ids - verifiable_ids
create_verification_details(range, for_creation_ids)
delete_verification_details(range, for_deletion_ids)
[for_creation_ids, for_deletion_ids].flatten.compact.any?
end
def create_verification_details(range, for_creation_ids)
replicable_model.find(for_creation_ids).map do |replicable|
replicable.create_verification_details
end
log_created(range, for_creation_ids)
end
def delete_verification_details(range, for_deletion_ids)
verification_state_table_class.delete(for_deletion_ids)
log_deleted(range, for_deletion_ids)
end
def log_created(range, for_creation_ids)
log_debug(
"Created verification details for ",
{
replicable_model: replicable_model.name,
start: range.first,
finish: range.last,
created: for_creation_ids
}
)
end
def log_deleted(range, for_deletion_ids)
log_debug(
"Deleted verification details for ",
{
replicable_model: replicable_model.name,
start: range.first,
finish: range.last,
deleted: for_deletion_ids
}
)
end
end
end
......@@ -705,6 +705,15 @@
:weight: 1
:idempotent: true
:tags: []
- :name: geo:geo_verification_state_backfill
:worker_name: Geo::VerificationStateBackfillWorker
:feature_category: :geo_replication
:has_external_dependencies:
:urgency: :low
:resource_boundary: :unknown
:weight: 1
:idempotent:
:tags: []
- :name: geo:geo_verification_timeout
:worker_name: Geo::VerificationTimeoutWorker
:feature_category: :geo_replication
......
# frozen_string_literal: true
module Geo
# Iterates over the table corresponding to the `replicable_class`
# to backfill the corresponding verification state table.
class VerificationStateBackfillWorker # rubocop:disable Scalability/IdempotentWorker
include ApplicationWorker
data_consistency :always
include ::Gitlab::Geo::LogHelpers
include GeoQueue
prepend Reenqueuer
LEASE_TIMEOUT = 1.minute
def perform(replicable_name)
replicator_class = ::Gitlab::Geo::Replicator.for_replicable_name(replicable_name)
replicator_class.backfill_verification_state_table
end
def lease_timeout
LEASE_TIMEOUT
end
end
end
---
name: verification_state_backfill_worker
introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/69301
rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/340648
milestone: '14.4'
type: development
group: group::geo
default_enabled: false
......@@ -18,6 +18,12 @@ module Gitlab
geo_logger.error(data)
end
def log_debug(message, details = {})
data = base_log_data(message)
data.merge!(details) if details
geo_logger.debug(data)
end
protected
def base_log_data(message)
......
......@@ -92,6 +92,10 @@ module Gitlab
end
end
def create_verification_details
raise NotImplementedError
end
private_class_method :start_verification_batch
private_class_method :start_verification_batch_query
private_class_method :start_verification_batch_subselect
......@@ -202,27 +206,24 @@ module Gitlab
.lock('FOR UPDATE SKIP LOCKED') # rubocop:disable CodeReuse/ActiveRecord
end
# Overridden in ReplicableRegistry
# This method can also be overriden in the replicable model class that
# includes this concern to specify the primary key of the database
# table that stores verification state
# Override this method in the class that includes this concern to specify
# a different ActiveRecord class to store verification state
# See module EE::MergeRequestDiff for example
def verification_state_table_class
self
end
# Overridden in ReplicableRegistry
def verification_state_model_key
self.primary_key
verification_state_table_class.primary_key
end
# Override this method in the class that includes this concern to specify
# a different database table to store verification state
# See module EE::MergeRequestDiff for example
def verification_state_table_name
table_name
verification_state_table_class.table_name
end
# Override this method in the class that includes this concern to specify
# a different arel table to store verification state
# See module EE::MergeRequestDiff for example
def verification_arel_table
arel_table
verification_state_table_class.arel_table
end
# Fail verification for records which started verification a long time ago
......@@ -289,6 +290,27 @@ module Gitlab
WHERE #{self.verification_state_model_key} IN (#{relation.select(self.verification_state_model_key).to_sql})
SQL
end
# rubocop:disable CodeReuse/ActiveRecord
def pluck_verification_details_ids_in_range(range)
verification_state_table_class
.where(self.verification_state_model_key => range)
.pluck(self.verification_state_model_key)
end
# rubocop:enable CodeReuse/ActiveRecord
def pluck_verifiable_ids_in_range(range)
self
.available_verifiables
.primary_key_in(range)
.pluck_primary_key
end
# @return whether primary checksum data is stored in a table separate
# from the model table
def separate_verification_state_table?
verification_state_table_name != table_name
end
end
# Overridden by Geo::VerifiableRegistry
......
......@@ -144,6 +144,19 @@ RSpec.shared_examples 'a verifiable replicator' do
described_class.trigger_background_verification
end
context 'when verification_state_backfill_worker feature flag is enabled' do
before do
stub_feature_flags(verification_state_backfill_worker: true)
expect(described_class.model).to receive(:separate_verification_state_table?).and_return(true)
end
it 'enqueues VerificationStateBackfillWorker' do
expect(::Geo::VerificationStateBackfillWorker).to receive(:perform_async).with(described_class.replicable_name)
described_class.trigger_background_verification
end
end
context 'for a Geo secondary' do
it 'does not enqueue ReverificationBatchWorker' do
stub_secondary_node
......@@ -184,6 +197,28 @@ RSpec.shared_examples 'a verifiable replicator' do
end
end
describe '.backfill_verification_state_table' do
context 'when on secondary' do
before do
stub_secondary_node
end
it 'returns false' do
expect(described_class.backfill_verification_state_table).to be_falsy
end
end
it 'calls VerificationStateBackfillService' do
stub_primary_node
expect_next_instance_of(Geo::VerificationStateBackfillService) do |service|
expect(service).to receive(:execute).and_return(true)
end
described_class.backfill_verification_state_table
end
end
describe '.verify_batch' do
context 'when there are records needing verification' do
let(:another_replicator) { double('another_replicator', verify: true) }
......
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Geo::VerificationStateBackfillService, :geo do
let(:replicable_1) { FactoryBot.create(:merge_request_diff, :external) }
let(:verifiable_1) { FactoryBot.create(:merge_request_diff_detail, merge_request_diff: replicable_1) }
subject(:job) { described_class.new(MergeRequestDiff, batch_size: 1000) }
describe '#execute' do
context 'when a replicable is missing a corresponding verifiable' do
before do
replicable_1.merge_request_diff_detail.destroy!
end
it 'adds a new verifiable' do
expect { job.execute }.to change { MergeRequestDiffDetail.count }.from(0).to(1)
end
end
context 'when some replicables were removed from scope' do
before do
replicable_1.update_attribute(:stored_externally, false)
end
it 'deletes the verifiable' do
expect { job.execute }.to change { MergeRequestDiffDetail.count }.from(1).to(0)
end
end
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Geo::VerificationStateBackfillWorker, :geo do
include EE::GeoHelpers
include ExclusiveLeaseHelpers
subject(:job) { described_class.new }
let(:job_args) { 'MergeRequestDiff' }
let_it_be(:primary) { create(:geo_node, :primary) }
before do
stub_current_geo_node(primary)
end
it 'uses a geo queue' do
expect(subject.sidekiq_options_hash).to include(
'queue' => 'geo:geo_verification_state_backfill',
'queue_namespace' => :geo
)
end
describe '#perform' do
it_behaves_like 'reenqueuer'
it_behaves_like '#perform is rate limited to 1 call per', 5.seconds
context 'when service is executed' do
before do
expect_next_instance_of(Geo::VerificationStateBackfillService) do |service|
expect(service).to receive(:execute).and_return(execute_return)
end
end
context 'when Geo::VerificationStateBackfillService#execute returns true' do
let(:execute_return) { true }
it 'returns true' do
expect(subject.perform(job_args)).to be_truthy
end
it 'worker gets reenqueued' do
expect(Geo::VerificationStateBackfillWorker).to receive(:perform_async)
subject.perform(job_args)
end
end
context 'when VerificationStateBackfillService#execute returns false' do
let(:execute_return) { false }
it 'returns false' do
expect(subject.perform(job_args)).to be_falsey
end
it 'worker does not get reenqueued (we will wait until next cronjob)' do
expect(Geo::VerificationStateBackfillWorker).not_to receive(:perform_async)
subject.perform(job_args)
end
end
end
end
end
# frozen_string_literal: true
# Expects `subject` to be a job/worker instance
# Expects `subject` to be a job/worker instance and
# `job_args` to be arguments to #perform if it takes arguments
RSpec.shared_examples 'reenqueuer' do
before do
allow(subject).to receive(:sleep) # faster tests
end
let(:subject_perform) { defined?(job_args) ? subject.perform(job_args) : subject.perform }
it 'implements lease_timeout' do
expect(subject.lease_timeout).to be_a(ActiveSupport::Duration)
end
......@@ -18,12 +21,13 @@ RSpec.shared_examples 'reenqueuer' do
it 'tries to obtain a lease' do
expect_to_obtain_exclusive_lease(subject.lease_key)
subject.perform
subject_perform
end
end
end
# Expects `subject` to be a job/worker instance
# Expects `subject` to be a job/worker instance and
# `job_args` to be arguments to #perform if it takes arguments
RSpec.shared_examples '#perform is rate limited to 1 call per' do |minimum_duration|
before do
# Allow Timecop freeze and travel without the block form
......@@ -38,13 +42,15 @@ RSpec.shared_examples '#perform is rate limited to 1 call per' do |minimum_durat
Timecop.safe_mode = true
end
let(:subject_perform) { defined?(job_args) ? subject.perform(job_args) : subject.perform }
context 'when the work finishes in 0 seconds' do
let(:actual_duration) { 0 }
it 'sleeps exactly the minimum duration' do
expect(subject).to receive(:sleep).with(a_value_within(0.01).of(minimum_duration))
subject.perform
subject_perform
end
end
......@@ -54,7 +60,7 @@ RSpec.shared_examples '#perform is rate limited to 1 call per' do |minimum_durat
it 'sleeps 90% of minimum duration' do
expect(subject).to receive(:sleep).with(a_value_within(0.01).of(0.9 * minimum_duration))
subject.perform
subject_perform
end
end
......@@ -64,7 +70,7 @@ RSpec.shared_examples '#perform is rate limited to 1 call per' do |minimum_durat
it 'sleeps 10% of minimum duration' do
expect(subject).to receive(:sleep).with(a_value_within(0.01).of(0.1 * minimum_duration))
subject.perform
subject_perform
end
end
......@@ -74,7 +80,7 @@ RSpec.shared_examples '#perform is rate limited to 1 call per' do |minimum_durat
it 'does not sleep' do
expect(subject).not_to receive(:sleep)
subject.perform
subject_perform
end
end
......@@ -84,7 +90,7 @@ RSpec.shared_examples '#perform is rate limited to 1 call per' do |minimum_durat
it 'does not sleep' do
expect(subject).not_to receive(:sleep)
subject.perform
subject_perform
end
end
......@@ -94,7 +100,7 @@ RSpec.shared_examples '#perform is rate limited to 1 call per' do |minimum_durat
it 'does not sleep' do
expect(subject).not_to receive(:sleep)
subject.perform
subject_perform
end
end
......
......@@ -257,6 +257,7 @@ RSpec.describe 'Every Sidekiq worker' do
'Geo::Scheduler::SchedulerWorker' => 3,
'Geo::Scheduler::Secondary::SchedulerWorker' => 3,
'Geo::VerificationBatchWorker' => 0,
'Geo::VerificationStateBackfillWorker' => false,
'Geo::VerificationTimeoutWorker' => false,
'Geo::VerificationWorker' => 3,
'GeoRepositoryDestroyWorker' => 3,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment