Commit d0553fc9 authored by Michael Kozono's avatar Michael Kozono

Merge branch 'ag-re-factor-scope-replicable-model' into 'master'

Geo: Refactor verification related concerns

See merge request gitlab-org/gitlab!77257
parents 279b5324 5b4bd7a5
......@@ -175,8 +175,8 @@ That's all of the required database changes.
#### Step 1. Implement replication and verification
- [ ] Add the following lines to the `cool_widget` model to accomplish some important tasks:
- Include `Gitlab::Geo::ReplicableModel` in the `CoolWidget` class, and specify the Replicator class `with_replicator Geo::CoolWidgetReplicator`.
- Include the `::Gitlab::Geo::VerificationState` concern.
- Include `::Geo::ReplicableModel` in the `CoolWidget` class, and specify the Replicator class `with_replicator Geo::CoolWidgetReplicator`.
- Include the `::Geo::VerifiableModel` concern.
- Delegate verification related methods to the `cool_widget_state` model.
- For verification, override some scopes to use the `cool_widget_states` table instead of the model table.
- Implement the `verification_state_object` method to return the object that holds
......@@ -192,8 +192,8 @@ That's all of the required database changes.
class CoolWidget < ApplicationRecord
...
include ::Gitlab::Geo::ReplicableModel
include ::Gitlab::Geo::VerificationState
include ::Geo::ReplicableModel
include ::Geo::VerifiableModel
with_replicator Geo::CoolWidgetReplicator
......
......@@ -179,8 +179,8 @@ That's all of the required database changes.
#### Step 1. Implement replication and verification
- [ ] Add the following lines to the `cool_widget` model to accomplish some important tasks:
- Include `Gitlab::Geo::ReplicableModel` in the `CoolWidget` class, and specify the Replicator class `with_replicator Geo::CoolWidgetReplicator`.
- Include the `::Gitlab::Geo::VerificationState` concern.
- Include `::Geo::ReplicableModel` in the `CoolWidget` class, and specify the Replicator class `with_replicator Geo::CoolWidgetReplicator`.
- Include the `::Geo::VerifiableModel` concern.
- Delegate verification related methods to the `cool_widget_state` model.
- For verification, override some scopes to use the `cool_widget_states` table instead of the model table.
- Implement the `verification_state_object` method to return the object that holds
......@@ -194,8 +194,8 @@ That's all of the required database changes.
class CoolWidget < ApplicationRecord
...
include ::Gitlab::Geo::ReplicableModel
include ::Gitlab::Geo::VerificationState
include ::Geo::ReplicableModel
include ::Geo::VerifiableModel
with_replicator Geo::CoolWidgetReplicator
......
......@@ -117,7 +117,7 @@ the model code:
```ruby
class Packages::PackageFile < ApplicationRecord
include ::Gitlab::Geo::ReplicableModel
include ::Geo::ReplicableModel
with_replicator Geo::PackageFileReplicator
end
......
# frozen_string_literal: true
module Geo
module ReplicableModel
extend ActiveSupport::Concern
include Checksummable
included do
# If this hook turns out not to apply to all Models, perhaps we should extract a `ReplicableBlobModel`
after_create_commit -> { replicator.handle_after_create_commit if replicator.respond_to?(:handle_after_create_commit) }
after_destroy -> { replicator.handle_after_destroy if replicator.respond_to?(:handle_after_destroy) }
# Temporarily defining `verification_succeeded` and
# `verification_failed` for unverified models while verification is
# under development to avoid breaking GeoNodeStatusCheck code.
# TODO: Remove these after including `::Geo::VerificationState` on
# all models. https://gitlab.com/gitlab-org/gitlab/-/issues/280768
scope :verification_succeeded, -> { none }
scope :verification_failed, -> { none }
# These scopes are intended to be overridden as needed
scope :available_replicables, -> { all }
# On primary, `verifiables` are records that can be checksummed and/or are replicable.
# On secondary, `verifiables` are records that have already been replicated
# and (ideally) have been checksummed on the primary
scope :verifiables, -> { self.respond_to?(:with_files_stored_locally) ? available_replicables.with_files_stored_locally : available_replicables }
# When storing verification details in the same table as the model,
# the scope `available_verifiables` returns only those records
# that are eligible for verification, i.e. the same as the scope
# `verifiables`.
# When using a separate table to store verification details,
# the scope `available_verifiables` should return all records
# from the separate table because the separate table will
# always only have records corresponding to replicables that are verifiable.
# For this, override the scope in the replicable model, e.g. like so in
# `MergeRequestDiff`,
# `scope :available_verifiables, -> { joins(:merge_request_diff_detail) }`
scope :available_verifiables, -> { verifiables }
end
class_methods do
# Associate current model with specified replicator
#
# @param [Gitlab::Geo::Replicator] klass
def with_replicator(klass)
raise ArgumentError, 'Must be a class inheriting from Gitlab::Geo::Replicator' unless klass < ::Gitlab::Geo::Replicator
class_eval <<-RUBY, __FILE__, __LINE__ + 1
define_method :replicator do
@_replicator ||= klass.new(model_record: self)
end
define_singleton_method :replicator_class do
@_replicator_class ||= klass
end
RUBY
end
end
# Geo Replicator
#
# @abstract
# @return [Gitlab::Geo::Replicator]
def replicator
raise NotImplementedError, 'There is no Replicator defined for this model'
end
def in_replicables_for_current_secondary?
self.class.replicables_for_current_secondary(self).exists?
end
end
end
# frozen_string_literal: true
module Geo
# This concern is included on Model classes (as opposed to Registry classes)
# to manage their verification states. Note that this concern does not handle
# how verification is performed; see `VerifiableReplicator`.
#
# It handles both cases where verification state is stored in a separate
# table or when it is stored in the same table as the model.
module VerifiableModel
extend ActiveSupport::Concern
include ::Geo::VerificationState
included do
def save_verification_details
return unless self.class.separate_verification_state_table?
return unless self.class.verifiables.primary_key_in(self).exists?
# During a transaction, `verification_state_object` could be built before
# a value for `verification_state_model_key` exists. So we check for that
# before saving the `verification_state_object`
unless verification_state_object.persisted?
verification_state_object[self.class.verification_state_model_key] = self.id
end
verification_state_object.save!
end
# Implement this method in the class that includes this concern to specify
# a different ActiveRecord association name that stores the verification state
# See module EE::MergeRequestDiff for example
def verification_state_object
raise NotImplementedError if self.class.separate_verification_state_table?
self
end
end
class_methods do
include Delay
def pluck_verification_details_ids_in_range(range)
verification_state_table_class
.where(self.verification_state_model_key => range)
.pluck(self.verification_state_model_key)
end
def pluck_verifiable_ids_in_range(range)
self
.verifiables
.primary_key_in(range)
.pluck_primary_key
end
# @return whether primary checksum data is stored in a table separate
# from the model table
def separate_verification_state_table?
verification_state_table_name != table_name
end
end
end
end
......@@ -4,13 +4,13 @@ module Geo
module VerifiableRegistry
extend ActiveSupport::Concern
extend ::Gitlab::Utils::Override
include ::Gitlab::Geo::VerificationState
include ::Geo::VerificationState
class_methods do
extend ::Gitlab::Utils::Override
# Overrides a method in `Gitlab::Geo::VerificationState`. This method is
# used by `Gitlab::Geo::VerificationState.start_verification_batch` to
# Overrides a method in `::Geo::VerificationState`. This method is
# used by `::Geo::VerificationState.start_verification_batch` to
# produce a query which must return values of the primary key of the
# *model*, not of the *registry*. We need this so we can instantiate
# Replicators.
......
# frozen_string_literal: true
module Geo
# This concern is included on VerifiableModel and on VerifiableRegistry to
# manage their verification fields.
module VerificationState
extend ActiveSupport::Concern
include ::ShaAttribute
include Delay
include EachBatch
include Gitlab::Geo::LogHelpers
VERIFICATION_STATE_VALUES = {
verification_pending: 0,
verification_started: 1,
verification_succeeded: 2,
verification_failed: 3
}.freeze
VERIFICATION_TIMEOUT = 8.hours
included do
sha_attribute :verification_checksum
scope :verification_pending, -> { available_verifiables.with_verification_state(:verification_pending) }
scope :verification_started, -> { available_verifiables.with_verification_state(:verification_started) }
scope :verification_succeeded, -> { available_verifiables.with_verification_state(:verification_succeeded) }
scope :verification_failed, -> { available_verifiables.with_verification_state(:verification_failed) }
scope :checksummed, -> { where.not(verification_checksum: nil) }
scope :not_checksummed, -> { where(verification_checksum: nil) }
scope :verification_timed_out, -> { available_verifiables.where(verification_arel_table[:verification_state].eq(1)).where(verification_arel_table[:verification_started_at].lt(VERIFICATION_TIMEOUT.ago)) }
scope :verification_retry_due, -> { where(verification_arel_table[:verification_retry_at].eq(nil).or(verification_arel_table[:verification_retry_at].lt(Time.current))) }
scope :needs_verification, -> { available_verifiables.merge(with_verification_state(:verification_pending).or(with_verification_state(:verification_failed).verification_retry_due)) }
scope :needs_reverification, -> { verification_succeeded.where("verified_at < ?", ::Gitlab::Geo.current_node.minimum_reverification_interval.days.ago) }
state_machine :verification_state, initial: :verification_pending do
state :verification_pending, value: VERIFICATION_STATE_VALUES[:verification_pending]
state :verification_started, value: VERIFICATION_STATE_VALUES[:verification_started]
state :verification_succeeded, value: VERIFICATION_STATE_VALUES[:verification_succeeded] do
validates :verification_checksum, presence: true
end
state :verification_failed, value: VERIFICATION_STATE_VALUES[:verification_failed] do
validates :verification_failure, presence: true
end
before_transition any => :verification_started do |instance, _|
instance.verification_started_at = Time.current
end
before_transition [:verification_pending, :verification_started, :verification_succeeded] => :verification_pending do |instance, _|
instance.clear_verification_failure_fields!
end
before_transition verification_failed: :verification_pending do |instance, _|
# If transitioning from verification_failed, then don't clear
# verification_retry_count and verification_retry_at to ensure
# progressive backoff of syncs-due-to-verification-failures
instance.verification_failure = nil
end
before_transition any => :verification_failed do |instance, _|
instance.before_verification_failed
end
before_transition any => :verification_succeeded do |instance, _|
instance.verified_at = Time.current
instance.clear_verification_failure_fields!
end
event :verification_started do
transition [:verification_pending, :verification_started, :verification_succeeded, :verification_failed] => :verification_started
end
event :verification_succeeded do
transition verification_started: :verification_succeeded
end
event :verification_failed do
transition [:verification_pending, :verification_started, :verification_succeeded, :verification_failed] => :verification_failed
end
event :verification_pending do
transition [:verification_pending, :verification_started, :verification_succeeded, :verification_failed] => :verification_pending
end
end
private_class_method :start_verification_batch
private_class_method :start_verification_batch_query
private_class_method :start_verification_batch_subselect
end
class_methods do
include Delay
def verification_state_value(state_string)
VERIFICATION_STATE_VALUES[state_string]
end
# Returns IDs of records that are pending verification.
#
# Atomically marks those records "verification_started" in the same DB
# query.
#
def verification_pending_batch(batch_size:)
relation = verification_pending.order(Gitlab::Database.nulls_first_order(:verified_at)).limit(batch_size)
start_verification_batch(relation)
end
# Returns IDs of records that failed to verify (calculate and save checksum).
#
# Atomically marks those records "verification_started" in the same DB
# query.
#
def verification_failed_batch(batch_size:)
relation = verification_failed.verification_retry_due.order(Gitlab::Database.nulls_first_order(:verification_retry_at)).limit(batch_size)
start_verification_batch(relation)
end
# @return [Integer] number of records that need verification
def needs_verification_count(limit:)
needs_verification.limit(limit).count
end
# @return [Integer] number of records that need reverification
def needs_reverification_count(limit:)
needs_reverification.limit(limit).count
end
# Atomically marks the records as verification_started, with a
# verification_started_at time, and returns the primary key of each
# updated row. This allows VerificationBatchWorker to concurrently get
# unique batches of primary keys to process.
#
# @param [ActiveRecord::Relation] relation with appropriate where, order, and limit defined
# @return [Array<Integer>] primary key of each updated row
def start_verification_batch(relation)
query = start_verification_batch_query(relation)
# This query performs a write, so we need to wrap it in a transaction
# to stick to the primary database.
self.transaction do
self.connection.execute(query).to_a.map do |row|
row[self.verification_state_model_key.to_s]
end
end
end
# Returns a SQL statement which would update all the rows in the
# relation as verification_started, with a verification_started_at time,
# and returns the primary key of each updated row.
#
# @param [ActiveRecord::Relation] relation with appropriate where, order, and limit defined
# @return [String] SQL statement which would update all and return primary key of each row
def start_verification_batch_query(relation)
started_enum_value = VERIFICATION_STATE_VALUES[:verification_started]
<<~SQL.squish
UPDATE #{verification_state_table_name}
SET "verification_state" = #{started_enum_value},
"verification_started_at" = NOW()
WHERE #{self.verification_state_model_key} IN (#{start_verification_batch_subselect(relation).to_sql})
RETURNING #{self.verification_state_model_key}
SQL
end
# This query locks the rows during the transaction, and skips locked
# rows so that this query can be run concurrently, safely and reasonably
# efficiently.
# See https://gitlab.com/gitlab-org/gitlab/-/issues/300051#note_496889565
#
# @param [ActiveRecord::Relation] relation with appropriate where, order, and limit defined
# @return [String] SQL statement which selects the primary keys to update
def start_verification_batch_subselect(relation)
relation
.select(self.verification_state_model_key)
.lock('FOR UPDATE SKIP LOCKED')
end
# Override this method in the class that includes this concern to specify
# a different ActiveRecord class to store verification state
# See module EE::MergeRequestDiff for example
def verification_state_table_class
self
end
# Overridden in ReplicableRegistry
def verification_state_model_key
verification_state_table_class.primary_key
end
def verification_state_table_name
verification_state_table_class.table_name
end
def verification_arel_table
verification_state_table_class.arel_table
end
def verification_timed_out_batch_query
return verification_timed_out unless separate_verification_state_table?
verification_state_table_class.where(self.verification_state_model_key => verification_timed_out)
end
# Fail verification for records which started verification a long time ago
def fail_verification_timeouts
attrs = {
verification_state: verification_state_value(:verification_failed),
verification_failure: "Verification timed out after #{VERIFICATION_TIMEOUT}",
verification_checksum: nil,
verification_retry_count: 1,
verification_retry_at: next_retry_time(1),
verified_at: Time.current
}
verification_timed_out_batch_query.each_batch do |relation|
relation.update_all(attrs)
end
end
# Reverifies batch and returns the number of records.
#
# Atomically marks those records "verification_pending" in the same DB
# query.
#
def reverify_batch(batch_size:)
relation = reverification_batch_relation(batch_size: batch_size)
mark_as_verification_pending(relation)
end
# Returns IDs of records that need re-verification.
#
# Atomically marks those records "verification_pending" in the same DB
# query.
def reverification_batch_relation(batch_size:)
needs_reverification.order(:verified_at).limit(batch_size)
end
# Atomically marks the records as verification_pending.
# Returns the number of records set to be referified.
#
# @param [ActiveRecord::Relation] relation with appropriate where, order, and limit defined
# @return [Integer] number of records
def mark_as_verification_pending(relation)
query = mark_as_verification_pending_query(relation)
self.connection.execute(query).cmd_tuples
end
# Returns a SQL statement which would update all the rows in the
# relation as verification_pending
# and returns the number of updated rows.
#
# @param [ActiveRecord::Relation] relation with appropriate where, order, and limit defined
# @return [String] SQL statement which would update all and return the number of rows
def mark_as_verification_pending_query(relation)
pending_enum_value = VERIFICATION_STATE_VALUES[:verification_pending]
<<~SQL.squish
UPDATE #{verification_state_table_name}
SET "verification_state" = #{pending_enum_value}
WHERE #{self.verification_state_model_key} IN (#{relation.select(self.verification_state_model_key).to_sql})
SQL
end
end
# Overridden by Geo::VerifiableRegistry
def clear_verification_failure_fields!
self.verification_retry_count = 0
self.verification_retry_at = nil
self.verification_failure = nil
end
# Overridden by Geo::VerifiableRegistry
def before_verification_failed
self.verification_retry_count ||= 0
self.verification_retry_count += 1
self.verification_retry_at = self.next_retry_time(self.verification_retry_count)
self.verified_at = Time.current
end
# Provides a safe and easy way to manage the verification state for a
# synchronous checksum calculation.
#
# @yieldreturn [String] calculated checksum value
def track_checksum_attempt!(&block)
# This line only applies to Geo::VerificationWorker, not
# Geo::VerificationBatchWorker, since the latter sets the whole batch to
# "verification_started" in the same DB query that fetches the batch.
verification_started! unless verification_started?
calculation_started_at = Time.current
checksum = yield
track_checksum_result!(checksum, calculation_started_at)
rescue StandardError => e
# Reset any potential changes from track_checksum_result, i.e.
# verification_retry_count may have been cleared.
reset
verification_failed_with_message!('Error during verification', e)
end
# Convenience method to update checksum and transition to success state.
#
# @param [String] checksum value generated by the checksum routine
# @param [DateTime] calculation_started_at the moment just before the
# checksum routine was called
def verification_succeeded_with_checksum!(checksum, calculation_started_at)
self.verification_checksum = checksum
self.verification_succeeded!
if resource_updated_during_checksum?(calculation_started_at)
# just let backfill pick it up
self.verification_pending!
elsif Gitlab::Geo.primary?
self.replicator.handle_after_checksum_succeeded
end
end
# Convenience method to update failure message and transition to failed
# state.
#
# @param [String] message error information
# @param [StandardError] error exception
def verification_failed_with_message!(message, error = nil)
log_error(message, error)
self.verification_failure = message
self.verification_failure += ": #{error.message}" if error.respond_to?(:message)
self.verification_failure.truncate(255)
self.verification_checksum = nil
self.verification_failed!
end
private
# Records the calculated checksum result
#
# Overridden by ReplicableRegistry so it can also compare with primary
# checksum.
#
# @param [String] calculated checksum value
# @param [Time] when checksum calculation was started
def track_checksum_result!(checksum, calculation_started_at)
verification_succeeded_with_checksum!(checksum, calculation_started_at)
end
def resource_updated_during_checksum?(calculation_started_at)
self.reset.verification_started_at > calculation_started_at
end
end
end
......@@ -6,8 +6,8 @@ module EE
extend ActiveSupport::Concern
prepended do
include ::Gitlab::Geo::ReplicableModel
include ::Gitlab::Geo::VerificationState
include ::Geo::ReplicableModel
include ::Geo::VerifiableModel
with_replicator ::Geo::PipelineArtifactReplicator
end
......
......@@ -11,8 +11,8 @@ module EE
STORE_COLUMN = :file_store
prepended do
include ::Gitlab::Geo::ReplicableModel
include ::Gitlab::Geo::VerificationState
include ::Geo::ReplicableModel
include ::Geo::VerifiableModel
with_replicator ::Geo::LfsObjectReplicator
......
......@@ -5,9 +5,9 @@ module EE
extend ActiveSupport::Concern
prepended do
include ::Gitlab::Geo::ReplicableModel
include ::Geo::ReplicableModel
include ObjectStorable
include ::Gitlab::Geo::VerificationState
include ::Geo::VerifiableModel
STORE_COLUMN = :external_diff_store
......
......@@ -6,8 +6,8 @@ module EE
extend ActiveSupport::Concern
prepended do
include ::Gitlab::Geo::ReplicableModel
include ::Gitlab::Geo::VerificationState
include ::Geo::ReplicableModel
include ::Geo::VerifiableModel
with_replicator ::Geo::PackageFileReplicator
end
......
......@@ -5,8 +5,8 @@ module EE
extend ActiveSupport::Concern
prepended do
include ::Gitlab::Geo::ReplicableModel
include ::Gitlab::Geo::VerificationState
include ::Geo::ReplicableModel
include ::Geo::VerifiableModel
with_replicator ::Geo::PagesDeploymentReplicator
......
......@@ -5,8 +5,8 @@ module EE
extend ActiveSupport::Concern
prepended do
include ::Gitlab::Geo::ReplicableModel
include ::Gitlab::Geo::VerificationState
include ::Geo::ReplicableModel
include ::Geo::VerifiableModel
include FromUnion
with_replicator ::Geo::SnippetRepositoryReplicator
......
......@@ -6,8 +6,8 @@ module EE
extend ActiveSupport::Concern
prepended do
include ::Gitlab::Geo::ReplicableModel
include ::Gitlab::Geo::VerificationState
include ::Geo::ReplicableModel
include ::Geo::VerifiableModel
with_replicator ::Geo::TerraformStateVersionReplicator
......
......@@ -10,8 +10,8 @@ module EE
prepended do
include ::Gitlab::SQL::Pattern
include ::Gitlab::Geo::ReplicableModel
include ::Gitlab::Geo::VerificationState
include ::Geo::ReplicableModel
include ::Geo::VerifiableModel
with_replicator ::Geo::UploadReplicator
......
# frozen_string_literal: true
class GroupWikiRepository < ApplicationRecord
include ::Gitlab::Geo::ReplicableModel
include ::Geo::ReplicableModel
include EachBatch
include Shardable
......
# frozen_string_literal: true
module Gitlab
module Geo
module ReplicableModel
extend ActiveSupport::Concern
include Checksummable
included do
# If this hook turns out not to apply to all Models, perhaps we should extract a `ReplicableBlobModel`
after_create_commit -> { replicator.handle_after_create_commit if replicator.respond_to?(:handle_after_create_commit) }
after_destroy -> { replicator.handle_after_destroy if replicator.respond_to?(:handle_after_destroy) }
# Temporarily defining `verification_succeeded` and
# `verification_failed` for unverified models while verification is
# under development to avoid breaking GeoNodeStatusCheck code.
# TODO: Remove these after including `Gitlab::Geo::VerificationState` on
# all models. https://gitlab.com/gitlab-org/gitlab/-/issues/280768
scope :verification_succeeded, -> { none }
scope :verification_failed, -> { none }
# These scopes are intended to be overridden as needed
scope :available_replicables, -> { all }
# On primary, `verifiables` are records that can be checksummed and/or are replicable.
# On secondary, `verifiables` are records that have already been replicated
# and (ideally) have been checksummed on the primary
scope :verifiables, -> { self.respond_to?(:with_files_stored_locally) ? available_replicables.with_files_stored_locally : available_replicables }
# When storing verification details in the same table as the model,
# the scope `available_verifiables` returns only those records
# that are eligible for verification, i.e. the same as the scope
# `verifiables`.
# When using a separate table to store verification details,
# the scope `available_verifiables` should return all records
# from the separate table because the separate table will
# always only have records corresponding to replicables that are verifiable.
# For this, override the scope in the replicable model, e.g. like so in
# `MergeRequestDiff`,
# `scope :available_verifiables, -> { joins(:merge_request_diff_detail) }`
scope :available_verifiables, -> { verifiables }
end
class_methods do
# Associate current model with specified replicator
#
# @param [Gitlab::Geo::Replicator] klass
def with_replicator(klass)
raise ArgumentError, 'Must be a class inheriting from Gitlab::Geo::Replicator' unless klass < ::Gitlab::Geo::Replicator
class_eval <<-RUBY, __FILE__, __LINE__ + 1
define_method :replicator do
@_replicator ||= klass.new(model_record: self)
end
define_singleton_method :replicator_class do
@_replicator_class ||= klass
end
RUBY
end
end
# Geo Replicator
#
# @abstract
# @return [Gitlab::Geo::Replicator]
def replicator
raise NotImplementedError, 'There is no Replicator defined for this model'
end
def in_replicables_for_current_secondary?
self.class.replicables_for_current_secondary(self).exists?
end
end
end
end
# frozen_string_literal: true
module Gitlab
module Geo
# This concern is included on ActiveRecord classes to manage their
# verification fields. This concern does not handle how verification is
# performed.
#
# This is a separate concern from Gitlab::Geo::ReplicableModel because e.g.
# MergeRequestDiff stores its verification state in a separate table with
# the association to MergeRequestDiffDetail.
module VerificationState
extend ActiveSupport::Concern
include ::ShaAttribute
include Delay
include EachBatch
include Gitlab::Geo::LogHelpers
VERIFICATION_STATE_VALUES = {
verification_pending: 0,
verification_started: 1,
verification_succeeded: 2,
verification_failed: 3
}.freeze
VERIFICATION_TIMEOUT = 8.hours
included do
sha_attribute :verification_checksum
# rubocop:disable CodeReuse/ActiveRecord
scope :verification_pending, -> { available_verifiables.with_verification_state(:verification_pending) }
scope :verification_started, -> { available_verifiables.with_verification_state(:verification_started) }
scope :verification_succeeded, -> { available_verifiables.with_verification_state(:verification_succeeded) }
scope :verification_failed, -> { available_verifiables.with_verification_state(:verification_failed) }
scope :checksummed, -> { where.not(verification_checksum: nil) }
scope :not_checksummed, -> { where(verification_checksum: nil) }
scope :verification_timed_out, -> { available_verifiables.where(verification_arel_table[:verification_state].eq(1)).where(verification_arel_table[:verification_started_at].lt(VERIFICATION_TIMEOUT.ago)) }
scope :verification_retry_due, -> { where(verification_arel_table[:verification_retry_at].eq(nil).or(verification_arel_table[:verification_retry_at].lt(Time.current))) }
scope :needs_verification, -> { available_verifiables.merge(with_verification_state(:verification_pending).or(with_verification_state(:verification_failed).verification_retry_due)) }
scope :needs_reverification, -> { verification_succeeded.where("verified_at < ?", ::Gitlab::Geo.current_node.minimum_reverification_interval.days.ago) }
# rubocop:enable CodeReuse/ActiveRecord
state_machine :verification_state, initial: :verification_pending do
state :verification_pending, value: VERIFICATION_STATE_VALUES[:verification_pending]
state :verification_started, value: VERIFICATION_STATE_VALUES[:verification_started]
state :verification_succeeded, value: VERIFICATION_STATE_VALUES[:verification_succeeded] do
validates :verification_checksum, presence: true
end
state :verification_failed, value: VERIFICATION_STATE_VALUES[:verification_failed] do
validates :verification_failure, presence: true
end
before_transition any => :verification_started do |instance, _|
instance.verification_started_at = Time.current
end
before_transition [:verification_pending, :verification_started, :verification_succeeded] => :verification_pending do |instance, _|
instance.clear_verification_failure_fields!
end
before_transition verification_failed: :verification_pending do |instance, _|
# If transitioning from verification_failed, then don't clear
# verification_retry_count and verification_retry_at to ensure
# progressive backoff of syncs-due-to-verification-failures
instance.verification_failure = nil
end
before_transition any => :verification_failed do |instance, _|
instance.before_verification_failed
end
before_transition any => :verification_succeeded do |instance, _|
instance.verified_at = Time.current
instance.clear_verification_failure_fields!
end
event :verification_started do
transition [:verification_pending, :verification_started, :verification_succeeded, :verification_failed] => :verification_started
end
event :verification_succeeded do
transition verification_started: :verification_succeeded
end
event :verification_failed do
transition [:verification_pending, :verification_started, :verification_succeeded, :verification_failed] => :verification_failed
end
event :verification_pending do
transition [:verification_pending, :verification_started, :verification_succeeded, :verification_failed] => :verification_pending
end
end
def save_verification_details
return unless self.class.separate_verification_state_table?
return unless self.class.verifiables.primary_key_in(self).exists?
# During a transaction, `verification_state_object` could be built before
# a value for `verification_state_model_key` exists. So we check for that
# before saving the `verification_state_object`
unless verification_state_object.persisted?
verification_state_object[self.class.verification_state_model_key] = self.id
end
verification_state_object.save!
end
# Implement this method in the class that includes this concern to specify
# a different ActiveRecord association name that stores the verification state
# See module EE::MergeRequestDiff for example
def verification_state_object
raise NotImplementedError if self.class.separate_verification_state_table?
self
end
private_class_method :start_verification_batch
private_class_method :start_verification_batch_query
private_class_method :start_verification_batch_subselect
end
class_methods do
include Delay
def verification_state_value(state_string)
VERIFICATION_STATE_VALUES[state_string]
end
# Returns IDs of records that are pending verification.
#
# Atomically marks those records "verification_started" in the same DB
# query.
#
def verification_pending_batch(batch_size:)
relation = verification_pending.order(Gitlab::Database.nulls_first_order(:verified_at)).limit(batch_size) # rubocop:disable CodeReuse/ActiveRecord
start_verification_batch(relation)
end
# Returns IDs of records that failed to verify (calculate and save checksum).
#
# Atomically marks those records "verification_started" in the same DB
# query.
#
def verification_failed_batch(batch_size:)
relation = verification_failed.verification_retry_due.order(Gitlab::Database.nulls_first_order(:verification_retry_at)).limit(batch_size) # rubocop:disable CodeReuse/ActiveRecord
start_verification_batch(relation)
end
# @return [Integer] number of records that need verification
def needs_verification_count(limit:)
needs_verification.limit(limit).count # rubocop:disable CodeReuse/ActiveRecord
end
# @return [Integer] number of records that need reverification
def needs_reverification_count(limit:)
needs_reverification.limit(limit).count # rubocop:disable CodeReuse/ActiveRecord
end
# Atomically marks the records as verification_started, with a
# verification_started_at time, and returns the primary key of each
# updated row. This allows VerificationBatchWorker to concurrently get
# unique batches of primary keys to process.
#
# @param [ActiveRecord::Relation] relation with appropriate where, order, and limit defined
# @return [Array<Integer>] primary key of each updated row
def start_verification_batch(relation)
query = start_verification_batch_query(relation)
# This query performs a write, so we need to wrap it in a transaction
# to stick to the primary database.
self.transaction do
self.connection.execute(query).to_a.map do |row|
row[self.verification_state_model_key.to_s]
end
end
end
# Returns a SQL statement which would update all the rows in the
# relation as verification_started, with a verification_started_at time,
# and returns the primary key of each updated row.
#
# @param [ActiveRecord::Relation] relation with appropriate where, order, and limit defined
# @return [String] SQL statement which would update all and return primary key of each row
def start_verification_batch_query(relation)
started_enum_value = VERIFICATION_STATE_VALUES[:verification_started]
<<~SQL.squish
UPDATE #{verification_state_table_name}
SET "verification_state" = #{started_enum_value},
"verification_started_at" = NOW()
WHERE #{self.verification_state_model_key} IN (#{start_verification_batch_subselect(relation).to_sql})
RETURNING #{self.verification_state_model_key}
SQL
end
# This query locks the rows during the transaction, and skips locked
# rows so that this query can be run concurrently, safely and reasonably
# efficiently.
# See https://gitlab.com/gitlab-org/gitlab/-/issues/300051#note_496889565
#
# @param [ActiveRecord::Relation] relation with appropriate where, order, and limit defined
# @return [String] SQL statement which selects the primary keys to update
def start_verification_batch_subselect(relation)
relation
.select(self.verification_state_model_key)
.lock('FOR UPDATE SKIP LOCKED') # rubocop:disable CodeReuse/ActiveRecord
end
# Override this method in the class that includes this concern to specify
# a different ActiveRecord class to store verification state
# See module EE::MergeRequestDiff for example
def verification_state_table_class
self
end
# Overridden in ReplicableRegistry
def verification_state_model_key
verification_state_table_class.primary_key
end
def verification_state_table_name
verification_state_table_class.table_name
end
def verification_arel_table
verification_state_table_class.arel_table
end
# rubocop:disable CodeReuse/ActiveRecord
def verification_timed_out_batch_query
return verification_timed_out unless separate_verification_state_table?
verification_state_table_class.where(self.verification_state_model_key => verification_timed_out)
end
# rubocop:enable CodeReuse/ActiveRecord
# Fail verification for records which started verification a long time ago
def fail_verification_timeouts
attrs = {
verification_state: verification_state_value(:verification_failed),
verification_failure: "Verification timed out after #{VERIFICATION_TIMEOUT}",
verification_checksum: nil,
verification_retry_count: 1,
verification_retry_at: next_retry_time(1),
verified_at: Time.current
}
verification_timed_out_batch_query.each_batch do |relation|
relation.update_all(attrs)
end
end
# Reverifies batch and returns the number of records.
#
# Atomically marks those records "verification_pending" in the same DB
# query.
#
def reverify_batch(batch_size:)
relation = reverification_batch_relation(batch_size: batch_size)
mark_as_verification_pending(relation)
end
# Returns IDs of records that need re-verification.
#
# Atomically marks those records "verification_pending" in the same DB
# query.
#
# rubocop:disable CodeReuse/ActiveRecord
def reverification_batch_relation(batch_size:)
needs_reverification.order(:verified_at).limit(batch_size)
end
# rubocop:enable CodeReuse/ActiveRecord
# Atomically marks the records as verification_pending.
# Returns the number of records set to be referified.
#
# @param [ActiveRecord::Relation] relation with appropriate where, order, and limit defined
# @return [Integer] number of records
def mark_as_verification_pending(relation)
query = mark_as_verification_pending_query(relation)
self.connection.execute(query).cmd_tuples
end
# Returns a SQL statement which would update all the rows in the
# relation as verification_pending
# and returns the number of updated rows.
#
# @param [ActiveRecord::Relation] relation with appropriate where, order, and limit defined
# @return [String] SQL statement which would update all and return the number of rows
def mark_as_verification_pending_query(relation)
pending_enum_value = VERIFICATION_STATE_VALUES[:verification_pending]
<<~SQL.squish
UPDATE #{verification_state_table_name}
SET "verification_state" = #{pending_enum_value}
WHERE #{self.verification_state_model_key} IN (#{relation.select(self.verification_state_model_key).to_sql})
SQL
end
# rubocop:disable CodeReuse/ActiveRecord
def pluck_verification_details_ids_in_range(range)
verification_state_table_class
.where(self.verification_state_model_key => range)
.pluck(self.verification_state_model_key)
end
# rubocop:enable CodeReuse/ActiveRecord
def pluck_verifiable_ids_in_range(range)
self
.verifiables
.primary_key_in(range)
.pluck_primary_key
end
# @return whether primary checksum data is stored in a table separate
# from the model table
def separate_verification_state_table?
verification_state_table_name != table_name
end
end
# Overridden by Geo::VerifiableRegistry
def clear_verification_failure_fields!
self.verification_retry_count = 0
self.verification_retry_at = nil
self.verification_failure = nil
end
# Overridden by Geo::VerifiableRegistry
def before_verification_failed
self.verification_retry_count ||= 0
self.verification_retry_count += 1
self.verification_retry_at = self.next_retry_time(self.verification_retry_count)
self.verified_at = Time.current
end
# Provides a safe and easy way to manage the verification state for a
# synchronous checksum calculation.
#
# @yieldreturn [String] calculated checksum value
def track_checksum_attempt!(&block)
# This line only applies to Geo::VerificationWorker, not
# Geo::VerificationBatchWorker, since the latter sets the whole batch to
# "verification_started" in the same DB query that fetches the batch.
verification_started! unless verification_started?
calculation_started_at = Time.current
checksum = yield
track_checksum_result!(checksum, calculation_started_at)
rescue StandardError => e
# Reset any potential changes from track_checksum_result, i.e.
# verification_retry_count may have been cleared.
reset
verification_failed_with_message!('Error during verification', e)
end
# Convenience method to update checksum and transition to success state.
#
# @param [String] checksum value generated by the checksum routine
# @param [DateTime] calculation_started_at the moment just before the
# checksum routine was called
def verification_succeeded_with_checksum!(checksum, calculation_started_at)
self.verification_checksum = checksum
self.verification_succeeded!
if resource_updated_during_checksum?(calculation_started_at)
# just let backfill pick it up
self.verification_pending!
elsif Gitlab::Geo.primary?
self.replicator.handle_after_checksum_succeeded
end
end
# Convenience method to update failure message and transition to failed
# state.
#
# @param [String] message error information
# @param [StandardError] error exception
def verification_failed_with_message!(message, error = nil)
log_error(message, error)
self.verification_failure = message
self.verification_failure += ": #{error.message}" if error.respond_to?(:message)
self.verification_failure.truncate(255)
self.verification_checksum = nil
self.verification_failed!
end
private
# Records the calculated checksum result
#
# Overridden by ReplicableRegistry so it can also compare with primary
# checksum.
#
# @param [String] calculated checksum value
# @param [Time] when checksum calculation was started
def track_checksum_result!(checksum, calculation_started_at)
verification_succeeded_with_checksum!(checksum, calculation_started_at)
end
def resource_updated_during_checksum?(calculation_started_at)
self.reset.verification_started_at > calculation_started_at
end
end
end
end
......@@ -8,7 +8,7 @@ require 'spec_helper'
# against a DummyModel.
# - Place tests in replicable_model_shared_examples.rb if you want them to be
# run against every real Model.
RSpec.describe Gitlab::Geo::ReplicableModel do
RSpec.describe Geo::ReplicableModel do
include ::EE::GeoHelpers
let_it_be(:primary_node) { create(:geo_node, :primary) }
......@@ -42,6 +42,17 @@ RSpec.describe Gitlab::Geo::ReplicableModel do
it 'instantiates a replicator into the model' do
expect(subject.replicator).to be_a(Geo::DummyReplicator)
end
context 'when replicator is not defined in inheriting class' do
before do
stub_const('DummyModel', Class.new(ApplicationRecord))
DummyModel.class_eval { include ::Geo::ReplicableModel }
end
it 'raises NotImplementedError' do
expect { DummyModel.new.replicator }.to raise_error(NotImplementedError)
end
end
end
describe '#in_replicables_for_current_secondary?' do
......
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Geo::VerifiableModel do
include ::EE::GeoHelpers
context 'when separate table is used for verification state' do
before(:all) do
create_dummy_model_with_separate_state_table
end
after(:all) do
drop_dummy_model_with_separate_state_table
end
before do
stub_dummy_replicator_class(model_class: 'TestDummyModelWithSeparateState')
stub_dummy_model_with_separate_state_class
end
subject { TestDummyModelWithSeparateState.new }
describe '.verification_state_model_key' do
it 'returns the primary key of the state model' do
expect(subject.class.verification_state_model_key).to eq(TestDummyModelState.primary_key)
end
end
end
context 'when separate table is not used for verification state' do
before(:all) do
create_dummy_model_table
end
after(:all) do
drop_dummy_model_table
end
before do
stub_dummy_replicator_class
stub_dummy_model_class
end
subject { DummyModel.new }
describe '.verification_state_object' do
it 'returns self' do
expect(subject.verification_state_object.id).to eq(subject.id)
end
end
describe '.verification_state_model_key' do
it 'returns the primary key of the model' do
expect(subject.class.verification_state_model_key).to eq(DummyModel.primary_key)
end
end
end
end
......@@ -2,7 +2,7 @@
require 'spec_helper'
RSpec.describe Gitlab::Geo::VerificationState do
RSpec.describe Geo::VerificationState do
include ::EE::GeoHelpers
let_it_be(:primary_node) { create(:geo_node, :primary) }
......@@ -434,7 +434,7 @@ RSpec.describe Gitlab::Geo::VerificationState do
end
before do
stub_dummy_replicator_class(model_class: 'DummyModelWithSeparateState')
stub_dummy_replicator_class(model_class: 'TestDummyModelWithSeparateState')
stub_dummy_model_with_separate_state_class
end
......
......@@ -79,8 +79,8 @@ module EE
stub_const('DummyModel', Class.new(ApplicationRecord))
DummyModel.class_eval do
include ::Gitlab::Geo::ReplicableModel
include ::Gitlab::Geo::VerificationState
include ::Geo::ReplicableModel
include ::Geo::VerifiableModel
with_replicator Geo::DummyReplicator
......@@ -170,8 +170,8 @@ module EE
TestDummyModelWithSeparateState.class_eval do
self.table_name = '_test_dummy_model_with_separate_states'
include ::Gitlab::Geo::ReplicableModel
include ::Gitlab::Geo::VerificationState
include ::Geo::ReplicableModel
include ::Geo::VerifiableModel
with_replicator Geo::DummyReplicator
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment