Commit fc930922 authored by Bob Van Landuyt's avatar Bob Van Landuyt

Merge branch '267546-limited-worker-loopless-inner-class' into 'master'

Implement the loopless mode for the cleanup policy worker

See merge request gitlab-org/gitlab!58123
parents 82a9fd86 0334ca5d
...@@ -7,6 +7,7 @@ class ContainerRepository < ApplicationRecord ...@@ -7,6 +7,7 @@ class ContainerRepository < ApplicationRecord
include Sortable include Sortable
WAITING_CLEANUP_STATUSES = %i[cleanup_scheduled cleanup_unfinished].freeze WAITING_CLEANUP_STATUSES = %i[cleanup_scheduled cleanup_unfinished].freeze
REQUIRING_CLEANUP_STATUSES = %i[cleanup_unscheduled cleanup_scheduled].freeze
belongs_to :project belongs_to :project
...@@ -31,6 +32,7 @@ class ContainerRepository < ApplicationRecord ...@@ -31,6 +32,7 @@ class ContainerRepository < ApplicationRecord
scope :for_project_id, ->(project_id) { where(project_id: project_id) } scope :for_project_id, ->(project_id) { where(project_id: project_id) }
scope :search_by_name, ->(query) { fuzzy_search(query, [:name], use_minimum_char_limit: false) } scope :search_by_name, ->(query) { fuzzy_search(query, [:name], use_minimum_char_limit: false) }
scope :waiting_for_cleanup, -> { where(expiration_policy_cleanup_status: WAITING_CLEANUP_STATUSES) } scope :waiting_for_cleanup, -> { where(expiration_policy_cleanup_status: WAITING_CLEANUP_STATUSES) }
scope :expiration_policy_started_at_nil_or_before, ->(timestamp) { where('expiration_policy_started_at < ? OR expiration_policy_started_at IS NULL', timestamp) }
def self.exists_by_path?(path) def self.exists_by_path?(path)
where( where(
...@@ -39,6 +41,23 @@ class ContainerRepository < ApplicationRecord ...@@ -39,6 +41,23 @@ class ContainerRepository < ApplicationRecord
).exists? ).exists?
end end
def self.with_enabled_policy
joins("INNER JOIN container_expiration_policies ON container_repositories.project_id = container_expiration_policies.project_id")
.where(container_expiration_policies: { enabled: true })
end
def self.requiring_cleanup
where(
container_repositories: { expiration_policy_cleanup_status: REQUIRING_CLEANUP_STATUSES },
project_id: ::ContainerExpirationPolicy.runnable_schedules
.select(:project_id)
)
end
def self.with_unfinished_cleanup
with_enabled_policy.cleanup_unfinished
end
# rubocop: disable CodeReuse/ServiceClass # rubocop: disable CodeReuse/ServiceClass
def registry def registry
@registry ||= begin @registry ||= begin
......
...@@ -13,7 +13,14 @@ module ContainerExpirationPolicies ...@@ -13,7 +13,14 @@ module ContainerExpirationPolicies
def execute def execute
return ServiceResponse.error(message: 'no repository') unless repository return ServiceResponse.error(message: 'no repository') unless repository
unless policy.valid?
disable_policy!
return ServiceResponse.error(message: 'invalid policy')
end
repository.start_expiration_policy! repository.start_expiration_policy!
schedule_next_run_if_needed
begin begin
service_result = Projects::ContainerRepository::CleanupTagsService service_result = Projects::ContainerRepository::CleanupTagsService
...@@ -28,7 +35,6 @@ module ContainerExpirationPolicies ...@@ -28,7 +35,6 @@ module ContainerExpirationPolicies
if service_result[:status] == :success if service_result[:status] == :success
repository.update!( repository.update!(
expiration_policy_cleanup_status: :cleanup_unscheduled, expiration_policy_cleanup_status: :cleanup_unscheduled,
expiration_policy_started_at: nil,
expiration_policy_completed_at: Time.zone.now expiration_policy_completed_at: Time.zone.now
) )
...@@ -42,6 +48,27 @@ module ContainerExpirationPolicies ...@@ -42,6 +48,27 @@ module ContainerExpirationPolicies
private private
def schedule_next_run_if_needed
return unless Feature.enabled?(:container_registry_expiration_policies_loopless)
return if policy.next_run_at.future?
repos_before_next_run = ::ContainerRepository.for_project_id(policy.project_id)
.expiration_policy_started_at_nil_or_before(policy.next_run_at)
return if repos_before_next_run.exists?
policy.schedule_next_run!
end
def disable_policy!
policy.disable!
repository.cleanup_unscheduled!
Gitlab::ErrorTracking.log_exception(
::ContainerExpirationPolicyWorker::InvalidPolicyError.new,
container_expiration_policy_id: policy.id
)
end
def success(cleanup_status, service_result) def success(cleanup_status, service_result)
payload = { payload = {
cleanup_status: cleanup_status, cleanup_status: cleanup_status,
......
...@@ -24,16 +24,22 @@ module ContainerExpirationPolicies ...@@ -24,16 +24,22 @@ module ContainerExpirationPolicies
cleanup_tags_service_deleted_size cleanup_tags_service_deleted_size
].freeze ].freeze
delegate :perform_work, :remaining_work_count, to: :inner_instance def perform_work
return unless throttling_enabled?
return unless container_repository
def inner_instance log_extra_metadata_on_done(:container_repository_id, container_repository.id)
strong_memoize(:inner_instance) do log_extra_metadata_on_done(:project_id, project.id)
if loopless_enabled?
Loopless.new(self) unless allowed_to_run?
else container_repository.cleanup_unscheduled!
Looping.new(self) log_extra_metadata_on_done(:cleanup_status, :skipped)
end return
end end
result = ContainerExpirationPolicies::CleanupService.new(container_repository)
.execute
log_on_done(result)
end end
def max_running_jobs def max_running_jobs
...@@ -42,6 +48,97 @@ module ContainerExpirationPolicies ...@@ -42,6 +48,97 @@ module ContainerExpirationPolicies
::Gitlab::CurrentSettings.container_registry_expiration_policies_worker_capacity ::Gitlab::CurrentSettings.container_registry_expiration_policies_worker_capacity
end end
def remaining_work_count
total_count = cleanup_scheduled_count + cleanup_unfinished_count
log_info(
cleanup_scheduled_count: cleanup_scheduled_count,
cleanup_unfinished_count: cleanup_unfinished_count,
cleanup_total_count: total_count
)
total_count
end
private
def container_repository
strong_memoize(:container_repository) do
ContainerRepository.transaction do
# rubocop: disable CodeReuse/ActiveRecord
# We need a lock to prevent two workers from picking up the same row
container_repository = if loopless_enabled?
next_container_repository
else
ContainerRepository.waiting_for_cleanup
.order(:expiration_policy_cleanup_status, :expiration_policy_started_at)
.limit(1)
.lock('FOR UPDATE SKIP LOCKED')
.first
end
# rubocop: enable CodeReuse/ActiveRecord
container_repository&.tap(&:cleanup_ongoing!)
end
end
end
def next_container_repository
# rubocop: disable CodeReuse/ActiveRecord
next_one_requiring = ContainerRepository.requiring_cleanup
.order(:expiration_policy_cleanup_status, :expiration_policy_started_at)
.limit(1)
.lock('FOR UPDATE SKIP LOCKED')
.first
return next_one_requiring if next_one_requiring
ContainerRepository.with_unfinished_cleanup
.order(:expiration_policy_started_at)
.limit(1)
.lock('FOR UPDATE SKIP LOCKED')
.first
# rubocop: enable CodeReuse/ActiveRecord
end
def cleanup_scheduled_count
strong_memoize(:cleanup_scheduled_count) do
if loopless_enabled?
limit = max_running_jobs + 1
ContainerExpirationPolicy.with_container_repositories
.runnable_schedules
.limit(limit)
.count
else
ContainerRepository.cleanup_scheduled.count
end
end
end
def cleanup_unfinished_count
strong_memoize(:cleanup_unfinished_count) do
if loopless_enabled?
limit = max_running_jobs + 1
ContainerRepository.with_unfinished_cleanup
.limit(limit)
.count
else
ContainerRepository.cleanup_unfinished.count
end
end
end
def allowed_to_run?
return false unless policy&.enabled && policy&.next_run_at
now = Time.zone.now
if loopless_enabled?
policy.next_run_at < now || (now + max_cleanup_execution_time.seconds < policy.next_run_at)
else
now + max_cleanup_execution_time.seconds < policy.next_run_at
end
end
def throttling_enabled? def throttling_enabled?
Feature.enabled?(:container_registry_expiration_policies_throttling) Feature.enabled?(:container_registry_expiration_policies_throttling)
end end
...@@ -59,6 +156,11 @@ module ContainerExpirationPolicies ...@@ -59,6 +156,11 @@ module ContainerExpirationPolicies
end end
def log_on_done(result) def log_on_done(result)
if result.error?
log_extra_metadata_on_done(:cleanup_status, :error)
log_extra_metadata_on_done(:cleanup_error_message, result.message)
end
LOG_ON_DONE_FIELDS.each do |field| LOG_ON_DONE_FIELDS.each do |field|
value = result.payload[field] value = result.payload[field]
...@@ -76,99 +178,12 @@ module ContainerExpirationPolicies ...@@ -76,99 +178,12 @@ module ContainerExpirationPolicies
log_extra_metadata_on_done(:running_jobs_count, running_jobs_count) log_extra_metadata_on_done(:running_jobs_count, running_jobs_count)
end end
# rubocop: disable Scalability/IdempotentWorker def policy
# TODO: move the logic from this class to the parent one when container_registry_expiration_policies_loopless is removed project.container_expiration_policy
# Tracking issue: https://gitlab.com/gitlab-org/gitlab/-/issues/325273
class Loopless
# TODO fill the logic here with the approach documented in
# https://gitlab.com/gitlab-org/gitlab/-/issues/267546#limited-worker
def initialize(parent)
@parent = parent
end
end end
# rubocop: enable Scalability/IdempotentWorker
# rubocop: disable Scalability/IdempotentWorker
# TODO remove this class when `container_registry_expiration_policies_loopless` is removed
# Tracking issue: https://gitlab.com/gitlab-org/gitlab/-/issues/325273
class Looping
include Gitlab::Utils::StrongMemoize
delegate :throttling_enabled?,
:log_extra_metadata_on_done,
:log_info,
:log_on_done,
:max_cleanup_execution_time,
to: :@parent
def initialize(parent)
@parent = parent
end
def perform_work
return unless throttling_enabled?
return unless container_repository
log_extra_metadata_on_done(:container_repository_id, container_repository.id)
log_extra_metadata_on_done(:project_id, project.id)
unless allowed_to_run?(container_repository)
container_repository.cleanup_unscheduled!
log_extra_metadata_on_done(:cleanup_status, :skipped)
return
end
result = ContainerExpirationPolicies::CleanupService.new(container_repository)
.execute
log_on_done(result)
end
def remaining_work_count
cleanup_scheduled_count = ContainerRepository.cleanup_scheduled.count
cleanup_unfinished_count = ContainerRepository.cleanup_unfinished.count
total_count = cleanup_scheduled_count + cleanup_unfinished_count
log_info(
cleanup_scheduled_count: cleanup_scheduled_count,
cleanup_unfinished_count: cleanup_unfinished_count,
cleanup_total_count: total_count
)
total_count def project
end container_repository.project
private
def allowed_to_run?(container_repository)
return false unless policy&.enabled && policy&.next_run_at
Time.zone.now + max_cleanup_execution_time.seconds < policy.next_run_at
end
def policy
project.container_expiration_policy
end
def project
container_repository.project
end
def container_repository
strong_memoize(:container_repository) do
ContainerRepository.transaction do
# rubocop: disable CodeReuse/ActiveRecord
# We need a lock to prevent two workers from picking up the same row
container_repository = ContainerRepository.waiting_for_cleanup
.order(:expiration_policy_cleanup_status, :expiration_policy_started_at)
.limit(1)
.lock('FOR UPDATE SKIP LOCKED')
.first
# rubocop: enable CodeReuse/ActiveRecord
container_repository&.tap(&:cleanup_ongoing!)
end
end
end
end end
# rubocop: enable Scalability/IdempotentWorker
end end
end end
---
title: Add indexes for cleanup policies on container_repositories and container_expiration_policies
merge_request: 58123
author:
type: added
# frozen_string_literal: true
class AddProjectIdNextRunAtIndexToContainerExpirationPolicies < ActiveRecord::Migration[6.0]
include Gitlab::Database::MigrationHelpers
disable_ddl_transaction!
INDEX_NAME = 'idx_container_exp_policies_on_project_id_next_run_at'
def up
add_concurrent_index :container_expiration_policies, [:project_id, :next_run_at], name: INDEX_NAME, where: 'enabled = true'
end
def down
remove_concurrent_index :container_expiration_policies, [:project_id, :next_run_at], name: INDEX_NAME
end
end
eaefc2a0f08ce312b1ae3fb100e4a818eb3013b95c38d940371a25b605b09ca1
\ No newline at end of file
...@@ -22059,6 +22059,8 @@ CREATE INDEX idx_award_emoji_on_user_emoji_name_awardable_type_awardable_id ON a ...@@ -22059,6 +22059,8 @@ CREATE INDEX idx_award_emoji_on_user_emoji_name_awardable_type_awardable_id ON a
CREATE INDEX idx_ci_pipelines_artifacts_locked ON ci_pipelines USING btree (ci_ref_id, id) WHERE (locked = 1); CREATE INDEX idx_ci_pipelines_artifacts_locked ON ci_pipelines USING btree (ci_ref_id, id) WHERE (locked = 1);
CREATE INDEX idx_container_exp_policies_on_project_id_next_run_at ON container_expiration_policies USING btree (project_id, next_run_at) WHERE (enabled = true);
CREATE INDEX idx_container_exp_policies_on_project_id_next_run_at_enabled ON container_expiration_policies USING btree (project_id, next_run_at, enabled); CREATE INDEX idx_container_exp_policies_on_project_id_next_run_at_enabled ON container_expiration_policies USING btree (project_id, next_run_at, enabled);
CREATE INDEX idx_container_repositories_on_exp_cleanup_status_and_start_date ON container_repositories USING btree (expiration_policy_cleanup_status, expiration_policy_started_at); CREATE INDEX idx_container_repositories_on_exp_cleanup_status_and_start_date ON container_repositories USING btree (expiration_policy_cleanup_status, expiration_policy_started_at);
...@@ -3,6 +3,8 @@ ...@@ -3,6 +3,8 @@
require 'spec_helper' require 'spec_helper'
RSpec.describe ContainerRepository do RSpec.describe ContainerRepository do
using RSpec::Parameterized::TableSyntax
let(:group) { create(:group, name: 'group') } let(:group) { create(:group, name: 'group') }
let(:project) { create(:project, path: 'test', group: group) } let(:project) { create(:project, path: 'test', group: group) }
...@@ -29,18 +31,6 @@ RSpec.describe ContainerRepository do ...@@ -29,18 +31,6 @@ RSpec.describe ContainerRepository do
end end
end end
describe '.exists_by_path?' do
it 'returns true for known container repository paths' do
path = ContainerRegistry::Path.new("#{project.full_path}/#{repository.name}")
expect(described_class.exists_by_path?(path)).to be_truthy
end
it 'returns false for unknown container repository paths' do
path = ContainerRegistry::Path.new('you/dont/know/me')
expect(described_class.exists_by_path?(path)).to be_falsey
end
end
describe '#tag' do describe '#tag' do
it 'has a test tag' do it 'has a test tag' do
expect(repository.tag('test')).not_to be_nil expect(repository.tag('test')).not_to be_nil
...@@ -359,6 +349,17 @@ RSpec.describe ContainerRepository do ...@@ -359,6 +349,17 @@ RSpec.describe ContainerRepository do
it { is_expected.to contain_exactly(repository) } it { is_expected.to contain_exactly(repository) }
end end
describe '.expiration_policy_started_at_nil_or_before' do
let_it_be(:repository1) { create(:container_repository, expiration_policy_started_at: nil) }
let_it_be(:repository2) { create(:container_repository, expiration_policy_started_at: 1.day.ago) }
let_it_be(:repository3) { create(:container_repository, expiration_policy_started_at: 2.hours.ago) }
let_it_be(:repository4) { create(:container_repository, expiration_policy_started_at: 1.week.ago) }
subject { described_class.expiration_policy_started_at_nil_or_before(3.hours.ago) }
it { is_expected.to contain_exactly(repository1, repository2, repository4) }
end
describe '.waiting_for_cleanup' do describe '.waiting_for_cleanup' do
let_it_be(:repository_cleanup_scheduled) { create(:container_repository, :cleanup_scheduled) } let_it_be(:repository_cleanup_scheduled) { create(:container_repository, :cleanup_scheduled) }
let_it_be(:repository_cleanup_unfinished) { create(:container_repository, :cleanup_unfinished) } let_it_be(:repository_cleanup_unfinished) { create(:container_repository, :cleanup_unfinished) }
...@@ -368,4 +369,74 @@ RSpec.describe ContainerRepository do ...@@ -368,4 +369,74 @@ RSpec.describe ContainerRepository do
it { is_expected.to contain_exactly(repository_cleanup_scheduled, repository_cleanup_unfinished) } it { is_expected.to contain_exactly(repository_cleanup_scheduled, repository_cleanup_unfinished) }
end end
describe '.exists_by_path?' do
it 'returns true for known container repository paths' do
path = ContainerRegistry::Path.new("#{project.full_path}/#{repository.name}")
expect(described_class.exists_by_path?(path)).to be_truthy
end
it 'returns false for unknown container repository paths' do
path = ContainerRegistry::Path.new('you/dont/know/me')
expect(described_class.exists_by_path?(path)).to be_falsey
end
end
describe '.with_enabled_policy' do
let_it_be(:repository) { create(:container_repository) }
let_it_be(:repository2) { create(:container_repository) }
subject { described_class.with_enabled_policy }
before do
repository.project.container_expiration_policy.update!(enabled: true)
end
it { is_expected.to eq([repository]) }
end
context 'with repositories' do
let_it_be_with_reload(:repository) { create(:container_repository, :cleanup_unscheduled) }
let_it_be(:other_repository) { create(:container_repository, :cleanup_unscheduled) }
let(:policy) { repository.project.container_expiration_policy }
before do
ContainerExpirationPolicy.update_all(enabled: true)
end
describe '.requiring_cleanup' do
subject { described_class.requiring_cleanup }
context 'with next_run_at in the future' do
before do
policy.update_column(:next_run_at, 10.minutes.from_now)
end
it { is_expected.to eq([]) }
end
context 'with next_run_at in the past' do
before do
policy.update_column(:next_run_at, 10.minutes.ago)
end
it { is_expected.to eq([repository]) }
end
end
describe '.with_unfinished_cleanup' do
subject { described_class.with_unfinished_cleanup }
it { is_expected.to eq([]) }
context 'with an unfinished repository' do
before do
repository.cleanup_unfinished!
end
it { is_expected.to eq([repository]) }
end
end
end
end end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment