# frozen_string_literal: true

module Geo
  # This concern is included on VerifiableModel and on VerifiableRegistry to
  # manage their verification fields.

  module VerificationState
    extend ActiveSupport::Concern
    include ::ShaAttribute
    include Delay
    include EachBatch
    include Gitlab::Geo::LogHelpers

    VERIFICATION_STATE_VALUES = {
      verification_pending: 0,
      verification_started: 1,
      verification_succeeded: 2,
      verification_failed: 3
    }.freeze

    VERIFICATION_TIMEOUT = 8.hours

    included do
      sha_attribute :verification_checksum

      scope :verification_pending, -> { available_verifiables.with_verification_state(:verification_pending) }
      scope :verification_started, -> { available_verifiables.with_verification_state(:verification_started) }
      scope :verification_succeeded, -> { available_verifiables.with_verification_state(:verification_succeeded) }
      scope :verification_failed, -> { available_verifiables.with_verification_state(:verification_failed) }
      scope :checksummed, -> { where.not(verification_checksum: nil) }
      scope :not_checksummed, -> { where(verification_checksum: nil) }
      scope :verification_timed_out, -> { available_verifiables.where(verification_arel_table[:verification_state].eq(1)).where(verification_arel_table[:verification_started_at].lt(VERIFICATION_TIMEOUT.ago)) }
      scope :verification_retry_due, -> { where(verification_arel_table[:verification_retry_at].eq(nil).or(verification_arel_table[:verification_retry_at].lt(Time.current))) }
      scope :needs_verification, -> { available_verifiables.merge(with_verification_state(:verification_pending).or(with_verification_state(:verification_failed).verification_retry_due)) }
      scope :needs_reverification, -> { verification_succeeded.where("verified_at < ?", ::Gitlab::Geo.current_node.minimum_reverification_interval.days.ago) }

      state_machine :verification_state, initial: :verification_pending do
        state :verification_pending, value: VERIFICATION_STATE_VALUES[:verification_pending]
        state :verification_started, value: VERIFICATION_STATE_VALUES[:verification_started]
        state :verification_succeeded, value: VERIFICATION_STATE_VALUES[:verification_succeeded] do
          validates :verification_checksum, presence: true
        end
        state :verification_failed, value: VERIFICATION_STATE_VALUES[:verification_failed] do
          validates :verification_failure, presence: true
        end

        before_transition any => :verification_started do |instance, _|
          instance.verification_started_at = Time.current
        end

        before_transition [:verification_pending, :verification_started, :verification_succeeded] => :verification_pending do |instance, _|
          instance.clear_verification_failure_fields!
        end

        before_transition verification_failed: :verification_pending do |instance, _|
          # If transitioning from verification_failed, then don't clear
          # verification_retry_count and verification_retry_at to ensure
          # progressive backoff of syncs-due-to-verification-failures
          instance.verification_failure = nil
        end

        before_transition any => :verification_failed do |instance, _|
          instance.before_verification_failed
        end

        before_transition any => :verification_succeeded do |instance, _|
          instance.verified_at = Time.current
          instance.clear_verification_failure_fields!
        end

        event :verification_started do
          transition [:verification_pending, :verification_started, :verification_succeeded, :verification_failed] => :verification_started
        end

        event :verification_succeeded do
          transition verification_started: :verification_succeeded
        end

        event :verification_failed do
          transition [:verification_pending, :verification_started, :verification_succeeded, :verification_failed] => :verification_failed
        end

        event :verification_pending do
          transition [:verification_pending, :verification_started, :verification_succeeded, :verification_failed] => :verification_pending
        end
      end

      private_class_method :start_verification_batch
      private_class_method :start_verification_batch_query
      private_class_method :start_verification_batch_subselect
    end

    class_methods do
      include Delay

      def verification_state_value(state_string)
        VERIFICATION_STATE_VALUES[state_string]
      end

      # Returns IDs of records that are pending verification.
      #
      # Atomically marks those records "verification_started" in the same DB
      # query.
      #
      def verification_pending_batch(batch_size:)
        relation = verification_pending.order(Gitlab::Database.nulls_first_order(:verified_at)).limit(batch_size)

        start_verification_batch(relation)
      end

      # Returns IDs of records that failed to verify (calculate and save checksum).
      #
      # Atomically marks those records "verification_started" in the same DB
      # query.
      #
      def verification_failed_batch(batch_size:)
        relation = verification_failed.verification_retry_due.order(Gitlab::Database.nulls_first_order(:verification_retry_at)).limit(batch_size)

        start_verification_batch(relation)
      end

      # @return [Integer] number of records that need verification
      def needs_verification_count(limit:)
        needs_verification.limit(limit).count
      end

      # @return [Integer] number of records that need reverification
      def needs_reverification_count(limit:)
        needs_reverification.limit(limit).count
      end

      # Atomically marks the records as verification_started, with a
      # verification_started_at time, and returns the primary key of each
      # updated row. This allows VerificationBatchWorker to concurrently get
      # unique batches of primary keys to process.
      #
      # @param [ActiveRecord::Relation] relation with appropriate where, order, and limit defined
      # @return [Array<Integer>] primary key of each updated row
      def start_verification_batch(relation)
        query = start_verification_batch_query(relation)

        # This query performs a write, so we need to wrap it in a transaction
        # to stick to the primary database.
        self.transaction do
          self.connection.execute(query).to_a.map do |row|
            row[self.verification_state_model_key.to_s]
          end
        end
      end

      # Returns a SQL statement which would update all the rows in the
      # relation as verification_started, with a verification_started_at time,
      # and returns the primary key of each updated row.
      #
      # @param [ActiveRecord::Relation] relation with appropriate where, order, and limit defined
      # @return [String] SQL statement which would update all and return primary key of each row
      def start_verification_batch_query(relation)
        started_enum_value = VERIFICATION_STATE_VALUES[:verification_started]

        <<~SQL.squish
          UPDATE #{verification_state_table_name}
          SET "verification_state" = #{started_enum_value},
            "verification_started_at" = NOW()
          WHERE #{self.verification_state_model_key} IN (#{start_verification_batch_subselect(relation).to_sql})

          RETURNING #{self.verification_state_model_key}
        SQL
      end

      # This query locks the rows during the transaction, and skips locked
      # rows so that this query can be run concurrently, safely and reasonably
      # efficiently.
      # See https://gitlab.com/gitlab-org/gitlab/-/issues/300051#note_496889565
      #
      # @param [ActiveRecord::Relation] relation with appropriate where, order, and limit defined
      # @return [String] SQL statement which selects the primary keys to update
      def start_verification_batch_subselect(relation)
        relation
          .select(self.verification_state_model_key)
          .lock('FOR UPDATE SKIP LOCKED')
      end

      # Override this method in the class that includes this concern to specify
      # a different ActiveRecord class to store verification state
      # See module EE::MergeRequestDiff for example
      def verification_state_table_class
        self
      end

      # Overridden in ReplicableRegistry
      def verification_state_model_key
        verification_state_table_class.primary_key
      end

      def verification_state_table_name
        verification_state_table_class.table_name
      end

      def verification_arel_table
        verification_state_table_class.arel_table
      end

      def verification_timed_out_batch_query
        return verification_timed_out unless separate_verification_state_table?

        verification_state_table_class.where(self.verification_state_model_key => verification_timed_out)
      end

      # Fail verification for records which started verification a long time ago
      def fail_verification_timeouts
        attrs = {
          verification_state: verification_state_value(:verification_failed),
          verification_failure: "Verification timed out after #{VERIFICATION_TIMEOUT}",
          verification_checksum: nil,
          verification_retry_count: 1,
          verification_retry_at: next_retry_time(1),
          verified_at: Time.current
        }

        verification_timed_out_batch_query.each_batch do |relation|
          relation.update_all(attrs)
        end
      end

      # Reverifies batch and returns the number of records.
      #
      # Atomically marks those records "verification_pending" in the same DB
      # query.
      #
      def reverify_batch(batch_size:)
        relation = reverification_batch_relation(batch_size: batch_size)

        mark_as_verification_pending(relation)
      end

      # Returns IDs of records that need re-verification.
      #
      # Atomically marks those records "verification_pending" in the same DB
      # query.
      def reverification_batch_relation(batch_size:)
        needs_reverification.order(:verified_at).limit(batch_size)
      end

      # Atomically marks the records as verification_pending.
      # Returns the number of records set to be referified.
      #
      # @param [ActiveRecord::Relation] relation with appropriate where, order, and limit defined
      # @return [Integer] number of records
      def mark_as_verification_pending(relation)
        query = mark_as_verification_pending_query(relation)

        self.connection.execute(query).cmd_tuples
      end

      # Returns a SQL statement which would update all the rows in the
      # relation as verification_pending
      # and returns the number of updated rows.
      #
      # @param [ActiveRecord::Relation] relation with appropriate where, order, and limit defined
      # @return [String] SQL statement which would update all and return the number of rows
      def mark_as_verification_pending_query(relation)
        pending_enum_value = VERIFICATION_STATE_VALUES[:verification_pending]

        <<~SQL.squish
          UPDATE #{verification_state_table_name}
          SET "verification_state" = #{pending_enum_value}
          WHERE #{self.verification_state_model_key} IN (#{relation.select(self.verification_state_model_key).to_sql})
        SQL
      end
    end

    # Overridden by Geo::VerifiableRegistry
    def clear_verification_failure_fields!
      self.verification_retry_count = 0
      self.verification_retry_at = nil
      self.verification_failure = nil
    end

    # Overridden by Geo::VerifiableRegistry
    def before_verification_failed
      self.verification_retry_count ||= 0
      self.verification_retry_count += 1
      self.verification_retry_at = self.next_retry_time(self.verification_retry_count)
      self.verified_at = Time.current
    end

    # Provides a safe and easy way to manage the verification state for a
    # synchronous checksum calculation.
    #
    # @yieldreturn [String] calculated checksum value
    def track_checksum_attempt!(&block)
      # This line only applies to Geo::VerificationWorker, not
      # Geo::VerificationBatchWorker, since the latter sets the whole batch to
      # "verification_started" in the same DB query that fetches the batch.
      verification_started! unless verification_started?

      calculation_started_at = Time.current

      checksum = yield

      track_checksum_result!(checksum, calculation_started_at)
    rescue StandardError => e
      # Reset any potential changes from track_checksum_result, i.e.
      # verification_retry_count may have been cleared.
      reset

      verification_failed_with_message!('Error during verification', e)
    end

    # Convenience method to update checksum and transition to success state.
    #
    # @param [String] checksum value generated by the checksum routine
    # @param [DateTime] calculation_started_at the moment just before the
    #                   checksum routine was called
    def verification_succeeded_with_checksum!(checksum, calculation_started_at)
      self.verification_checksum = checksum

      self.verification_succeeded!

      if resource_updated_during_checksum?(calculation_started_at)
        # just let backfill pick it up
        self.verification_pending!
      elsif Gitlab::Geo.primary?
        self.replicator.handle_after_checksum_succeeded
      end
    end

    # Convenience method to update failure message and transition to failed
    # state.
    #
    # @param [String] message error information
    # @param [StandardError] error exception
    def verification_failed_with_message!(message, error = nil)
      log_error(message, error)

      self.verification_failure = message
      self.verification_failure += ": #{error.message}" if error.respond_to?(:message)
      self.verification_failure.truncate(255)
      self.verification_checksum = nil

      self.verification_failed!
    end

    private

    # Records the calculated checksum result
    #
    # Overridden by ReplicableRegistry so it can also compare with primary
    # checksum.
    #
    # @param [String] calculated checksum value
    # @param [Time] when checksum calculation was started
    def track_checksum_result!(checksum, calculation_started_at)
      verification_succeeded_with_checksum!(checksum, calculation_started_at)
    end

    def resource_updated_during_checksum?(calculation_started_at)
      self.reset.verification_started_at > calculation_started_at
    end
  end
end