Commit 0080186a authored by Patrick Bajao's avatar Patrick Bajao

Schedule MergeRequestCleanupRefsWorker more efficiently

This converts the MergeRequestCleanupRefsWorker to be a limited
capacity worker.

The ScheduleMergeRequestCleanupRefsWorker will still be enqueuing
jobs but it'll be based on capacity. If there's no capacity then
no new jobs will be enqueued.

For now, we are capping it at 4 jobs at a time. This will be later
on be configurable.

This is still behind the merge_request_refs_cleanup feature flag
so we can easily turn it off on production while testing.

Changelog: changed
parent 983fe9da
# frozen_string_literal: true
class MergeRequest::CleanupSchedule < ApplicationRecord
STATUSES = {
unstarted: 0,
running: 1,
completed: 2,
failed: 3
}.freeze
belongs_to :merge_request, inverse_of: :cleanup_schedule
validates :scheduled_at, presence: true
def self.scheduled_merge_request_ids(limit)
where('completed_at IS NULL AND scheduled_at <= NOW()')
state_machine :status, initial: :unstarted do
state :unstarted, value: STATUSES[:unstarted]
state :running, value: STATUSES[:running]
state :completed, value: STATUSES[:completed]
state :failed, value: STATUSES[:failed]
event :run do
transition unstarted: :running
end
event :retry do
transition running: :unstarted
end
event :complete do
transition running: :completed
end
event :mark_as_failed do
transition running: :failed
end
before_transition to: [:completed] do |cleanup_schedule, _transition|
cleanup_schedule.completed_at = Time.current
end
before_transition from: :running, to: [:unstarted, :failed] do |cleanup_schedule, _transition|
cleanup_schedule.failed_count += 1
end
end
scope :scheduled_and_unstarted, -> {
where('completed_at IS NULL AND scheduled_at <= NOW() AND status = ?', STATUSES[:unstarted])
.order('scheduled_at DESC')
.limit(limit)
.pluck(:merge_request_id)
}
def self.start_next
MergeRequest::CleanupSchedule.transaction do
cleanup_schedule = scheduled_and_unstarted.lock('FOR UPDATE SKIP LOCKED').first
next if cleanup_schedule.blank?
cleanup_schedule.run!
cleanup_schedule
end
end
end
......@@ -2,6 +2,8 @@
class MergeRequestCleanupRefsWorker
include ApplicationWorker
include LimitedCapacity::Worker
include Gitlab::Utils::StrongMemoize
sidekiq_options retry: 3
......@@ -9,20 +11,60 @@ class MergeRequestCleanupRefsWorker
tags :exclude_from_kubernetes
idempotent!
def perform(merge_request_id)
return unless Feature.enabled?(:merge_request_refs_cleanup, default_enabled: false)
# Hard-coded to 4 for now. Will be configurable later on via application settings.
# This means, there can only be 4 jobs running at the same time at maximum.
MAX_RUNNING_JOBS = 4
FAILURE_THRESHOLD = 3
merge_request = MergeRequest.find_by_id(merge_request_id)
def perform_work
return unless Feature.enabled?(:merge_request_refs_cleanup, default_enabled: false)
unless merge_request
logger.error("Failed to find merge request with ID: #{merge_request_id}")
logger.error('No existing merge request to be cleaned up.')
return
end
result = ::MergeRequests::CleanupRefsService.new(merge_request).execute
log_extra_metadata_on_done(:merge_request_id, merge_request.id)
return if result[:status] == :success
result = MergeRequests::CleanupRefsService.new(merge_request).execute
logger.error("Failed cleanup refs of merge request (#{merge_request_id}): #{result[:message]}")
if result[:status] == :success
merge_request_cleanup_schedule.complete!
else
if merge_request_cleanup_schedule.failed_count < FAILURE_THRESHOLD
merge_request_cleanup_schedule.retry!
else
merge_request_cleanup_schedule.mark_as_failed!
end
log_extra_metadata_on_done(:message, result[:message])
end
log_extra_metadata_on_done(:status, merge_request_cleanup_schedule.status)
end
def remaining_work_count
MergeRequest::CleanupSchedule
.scheduled_and_unstarted
.limit(max_running_jobs)
.count
end
def max_running_jobs
MAX_RUNNING_JOBS
end
private
def merge_request
strong_memoize(:merge_request) do
merge_request_cleanup_schedule&.merge_request
end
end
def merge_request_cleanup_schedule
strong_memoize(:merge_request_cleanup_schedule) do
MergeRequest::CleanupSchedule.start_next
end
end
end
......@@ -10,21 +10,10 @@ class ScheduleMergeRequestCleanupRefsWorker
tags :exclude_from_kubernetes
idempotent!
# Based on existing data, MergeRequestCleanupRefsWorker can run 3 jobs per
# second. This means that 180 jobs can be performed but since there are some
# spikes from time time, it's better to give it some allowance.
LIMIT = 180
DELAY = 10.seconds
BATCH_SIZE = 30
def perform
return if Gitlab::Database.read_only?
return unless Feature.enabled?(:merge_request_refs_cleanup, default_enabled: false)
ids = MergeRequest::CleanupSchedule.scheduled_merge_request_ids(LIMIT).map { |id| [id] }
MergeRequestCleanupRefsWorker.bulk_perform_in(DELAY, ids, batch_size: BATCH_SIZE) # rubocop:disable Scalability/BulkPerformWithContext
log_extra_metadata_on_done(:merge_requests_count, ids.size)
MergeRequestCleanupRefsWorker.perform_with_capacity
end
end
# frozen_string_literal: true
class AddStatusToMergeRequestCleanupSchedules < ActiveRecord::Migration[6.1]
include Gitlab::Database::MigrationHelpers
INDEX_NAME = 'index_merge_request_cleanup_schedules_on_status'
disable_ddl_transaction!
def up
unless column_exists?(:merge_request_cleanup_schedules, :status)
add_column(:merge_request_cleanup_schedules, :status, :integer, limit: 2, default: 0, null: false)
end
add_concurrent_index(:merge_request_cleanup_schedules, :status, name: INDEX_NAME)
end
def down
remove_concurrent_index_by_name(:merge_request_cleanup_schedules, INDEX_NAME)
if column_exists?(:merge_request_cleanup_schedules, :status)
remove_column(:merge_request_cleanup_schedules, :status)
end
end
end
# frozen_string_literal: true
class AddFailedCountToMergeRequestCleanupSchedules < ActiveRecord::Migration[6.1]
include Gitlab::Database::MigrationHelpers
def change
add_column :merge_request_cleanup_schedules, :failed_count, :integer, default: 0, null: false
end
end
# frozen_string_literal: true
class UpdateMergeRequestCleanupSchedulesScheduledAtIndex < ActiveRecord::Migration[6.1]
include Gitlab::Database::MigrationHelpers
INDEX_NAME = 'index_mr_cleanup_schedules_timestamps_status'
OLD_INDEX_NAME = 'index_mr_cleanup_schedules_timestamps'
disable_ddl_transaction!
def up
add_concurrent_index(:merge_request_cleanup_schedules, :scheduled_at, where: 'completed_at IS NULL AND status = 0', name: INDEX_NAME)
remove_concurrent_index_by_name(:merge_request_cleanup_schedules, OLD_INDEX_NAME)
end
def down
remove_concurrent_index_by_name(:merge_request_cleanup_schedules, INDEX_NAME)
add_concurrent_index(:merge_request_cleanup_schedules, :scheduled_at, where: 'completed_at IS NULL', name: OLD_INDEX_NAME)
end
end
98d4deaf0564119c1ee44d76d3a30bff1a0fceb7cab67c5dbef576faef62ddf5
\ No newline at end of file
77f6db1d2aeebdefd76c96966da6c9e4ce5da2c92a42f6ac2398b35fa21c680f
\ No newline at end of file
2899d954a199fa52bf6ab4beca5f22dcb9f9f0312e658f1307d1a7355394f1bb
\ No newline at end of file
......@@ -14711,7 +14711,9 @@ CREATE TABLE merge_request_cleanup_schedules (
scheduled_at timestamp with time zone NOT NULL,
completed_at timestamp with time zone,
created_at timestamp with time zone NOT NULL,
updated_at timestamp with time zone NOT NULL
updated_at timestamp with time zone NOT NULL,
status smallint DEFAULT 0 NOT NULL,
failed_count integer DEFAULT 0 NOT NULL
);
CREATE SEQUENCE merge_request_cleanup_schedules_merge_request_id_seq
......@@ -23988,6 +23990,8 @@ CREATE INDEX index_merge_request_blocks_on_blocked_merge_request_id ON merge_req
CREATE UNIQUE INDEX index_merge_request_cleanup_schedules_on_merge_request_id ON merge_request_cleanup_schedules USING btree (merge_request_id);
CREATE INDEX index_merge_request_cleanup_schedules_on_status ON merge_request_cleanup_schedules USING btree (status);
CREATE UNIQUE INDEX index_merge_request_diff_commit_users_on_name_and_email ON merge_request_diff_commit_users USING btree (name, email);
CREATE INDEX index_merge_request_diff_commits_on_sha ON merge_request_diff_commits USING btree (sha);
......@@ -24120,7 +24124,7 @@ CREATE INDEX index_mirror_data_non_scheduled_or_started ON project_mirror_data U
CREATE UNIQUE INDEX index_mr_blocks_on_blocking_and_blocked_mr_ids ON merge_request_blocks USING btree (blocking_merge_request_id, blocked_merge_request_id);
CREATE INDEX index_mr_cleanup_schedules_timestamps ON merge_request_cleanup_schedules USING btree (scheduled_at) WHERE (completed_at IS NULL);
CREATE INDEX index_mr_cleanup_schedules_timestamps_status ON merge_request_cleanup_schedules USING btree (scheduled_at) WHERE ((completed_at IS NULL) AND (status = 0));
CREATE UNIQUE INDEX index_mr_context_commits_on_merge_request_id_and_sha ON merge_request_context_commits USING btree (merge_request_id, sha);
......@@ -3,6 +3,19 @@
FactoryBot.define do
factory :merge_request_cleanup_schedule, class: 'MergeRequest::CleanupSchedule' do
merge_request
scheduled_at { Time.current }
scheduled_at { 1.day.ago }
trait :running do
status { MergeRequest::CleanupSchedule::STATUSES[:running] }
end
trait :completed do
status { MergeRequest::CleanupSchedule::STATUSES[:completed] }
completed_at { Time.current }
end
trait :failed do
status { MergeRequest::CleanupSchedule::STATUSES[:failed] }
end
end
end
......@@ -11,22 +11,125 @@ RSpec.describe MergeRequest::CleanupSchedule do
it { is_expected.to validate_presence_of(:scheduled_at) }
end
describe '.scheduled_merge_request_ids' do
let_it_be(:mr_cleanup_schedule_1) { create(:merge_request_cleanup_schedule, scheduled_at: 2.days.ago) }
let_it_be(:mr_cleanup_schedule_2) { create(:merge_request_cleanup_schedule, scheduled_at: 1.day.ago) }
let_it_be(:mr_cleanup_schedule_3) { create(:merge_request_cleanup_schedule, scheduled_at: 1.day.ago, completed_at: Time.current) }
let_it_be(:mr_cleanup_schedule_4) { create(:merge_request_cleanup_schedule, scheduled_at: 4.days.ago) }
let_it_be(:mr_cleanup_schedule_5) { create(:merge_request_cleanup_schedule, scheduled_at: 3.days.ago) }
let_it_be(:mr_cleanup_schedule_6) { create(:merge_request_cleanup_schedule, scheduled_at: 1.day.from_now) }
let_it_be(:mr_cleanup_schedule_7) { create(:merge_request_cleanup_schedule, scheduled_at: 5.days.ago) }
it 'only includes incomplete schedule within the specified limit' do
expect(described_class.scheduled_merge_request_ids(4)).to eq([
mr_cleanup_schedule_2.merge_request_id,
mr_cleanup_schedule_1.merge_request_id,
mr_cleanup_schedule_5.merge_request_id,
mr_cleanup_schedule_4.merge_request_id
describe 'state machine transitions' do
let(:cleanup_schedule) { create(:merge_request_cleanup_schedule) }
it 'sets status to unstarted by default' do
expect(cleanup_schedule).to be_unstarted
end
describe '#run' do
it 'sets the status to running' do
cleanup_schedule.run
expect(cleanup_schedule.reload).to be_running
end
context 'when previous status is not unstarted' do
let(:cleanup_schedule) { create(:merge_request_cleanup_schedule, :running) }
it 'does not change status' do
expect { cleanup_schedule.run }.not_to change(cleanup_schedule, :status)
end
end
end
describe '#retry' do
let(:cleanup_schedule) { create(:merge_request_cleanup_schedule, :running) }
it 'sets the status to unstarted' do
cleanup_schedule.retry
expect(cleanup_schedule.reload).to be_unstarted
end
it 'increments failed_count' do
expect { cleanup_schedule.retry }.to change(cleanup_schedule, :failed_count).by(1)
end
context 'when previous status is not running' do
let(:cleanup_schedule) { create(:merge_request_cleanup_schedule) }
it 'does not change status' do
expect { cleanup_schedule.retry }.not_to change(cleanup_schedule, :status)
end
end
end
describe '#complete' do
let(:cleanup_schedule) { create(:merge_request_cleanup_schedule, :running) }
it 'sets the status to completed' do
cleanup_schedule.complete
expect(cleanup_schedule.reload).to be_completed
end
it 'sets the completed_at' do
expect { cleanup_schedule.complete }.to change(cleanup_schedule, :completed_at)
end
context 'when previous status is not running' do
let(:cleanup_schedule) { create(:merge_request_cleanup_schedule, :completed) }
it 'does not change status' do
expect { cleanup_schedule.complete }.not_to change(cleanup_schedule, :status)
end
end
end
describe '#mark_as_failed' do
let(:cleanup_schedule) { create(:merge_request_cleanup_schedule, :running) }
it 'sets the status to failed' do
cleanup_schedule.mark_as_failed
expect(cleanup_schedule.reload).to be_failed
end
it 'increments failed_count' do
expect { cleanup_schedule.mark_as_failed }.to change(cleanup_schedule, :failed_count).by(1)
end
context 'when previous status is not running' do
let(:cleanup_schedule) { create(:merge_request_cleanup_schedule, :failed) }
it 'does not change status' do
expect { cleanup_schedule.mark_as_failed }.not_to change(cleanup_schedule, :status)
end
end
end
end
describe '.scheduled_and_unstarted' do
let!(:cleanup_schedule_1) { create(:merge_request_cleanup_schedule, scheduled_at: 2.days.ago) }
let!(:cleanup_schedule_2) { create(:merge_request_cleanup_schedule, scheduled_at: 1.day.ago) }
let!(:cleanup_schedule_3) { create(:merge_request_cleanup_schedule, :completed, scheduled_at: 1.day.ago) }
let!(:cleanup_schedule_4) { create(:merge_request_cleanup_schedule, scheduled_at: 4.days.ago) }
let!(:cleanup_schedule_5) { create(:merge_request_cleanup_schedule, scheduled_at: 3.days.ago) }
let!(:cleanup_schedule_6) { create(:merge_request_cleanup_schedule, scheduled_at: 1.day.from_now) }
let!(:cleanup_schedule_7) { create(:merge_request_cleanup_schedule, :failed, scheduled_at: 5.days.ago) }
it 'returns records that are scheduled before or on current time and unstarted (ordered by scheduled first)' do
expect(described_class.scheduled_and_unstarted).to eq([
cleanup_schedule_2,
cleanup_schedule_1,
cleanup_schedule_5,
cleanup_schedule_4
])
end
end
describe '.start_next' do
let!(:cleanup_schedule_1) { create(:merge_request_cleanup_schedule, :completed, scheduled_at: 1.day.ago) }
let!(:cleanup_schedule_2) { create(:merge_request_cleanup_schedule, scheduled_at: 2.days.ago) }
let!(:cleanup_schedule_3) { create(:merge_request_cleanup_schedule, :running, scheduled_at: 1.day.ago) }
let!(:cleanup_schedule_4) { create(:merge_request_cleanup_schedule, scheduled_at: 3.days.ago) }
let!(:cleanup_schedule_5) { create(:merge_request_cleanup_schedule, :failed, scheduled_at: 3.days.ago) }
it 'finds the next scheduled and unstarted then marked it as running' do
expect(described_class.start_next).to eq(cleanup_schedule_2)
expect(cleanup_schedule_2.reload).to be_running
end
end
end
......@@ -3,18 +3,41 @@
require 'spec_helper'
RSpec.describe MergeRequestCleanupRefsWorker do
describe '#perform' do
context 'when merge request exists' do
let(:merge_request) { create(:merge_request) }
let(:job_args) { merge_request.id }
let(:worker) { described_class.new }
include_examples 'an idempotent worker' do
it 'calls MergeRequests::CleanupRefsService#execute' do
expect_next_instance_of(MergeRequests::CleanupRefsService, merge_request) do |svc|
expect(svc).to receive(:execute).and_call_original
end.twice
describe '#perform_work' do
context 'when next cleanup schedule is found' do
let(:failed_count) { 0 }
let!(:cleanup_schedule) { create(:merge_request_cleanup_schedule, failed_count: failed_count) }
subject
it 'marks the cleanup schedule as completed on success' do
stub_cleanup_service(status: :success)
worker.perform_work
expect(cleanup_schedule.reload).to be_completed
expect(cleanup_schedule.completed_at).to be_present
end
context 'when service fails' do
before do
stub_cleanup_service(status: :error)
worker.perform_work
end
it 'marks the cleanup schedule as unstarted and track the failure' do
expect(cleanup_schedule.reload).to be_unstarted
expect(cleanup_schedule.failed_count).to eq(1)
expect(cleanup_schedule.completed_at).to be_nil
end
context "and cleanup schedule has already failed #{described_class::FAILURE_THRESHOLD} times" do
let(:failed_count) { described_class::FAILURE_THRESHOLD }
it 'marks the cleanup schedule as failed and track the failure' do
expect(cleanup_schedule.reload).to be_failed
expect(cleanup_schedule.failed_count).to eq(described_class::FAILURE_THRESHOLD + 1)
expect(cleanup_schedule.completed_at).to be_nil
end
end
end
......@@ -23,20 +46,52 @@ RSpec.describe MergeRequestCleanupRefsWorker do
stub_feature_flags(merge_request_refs_cleanup: false)
end
it 'does not clean up the merge request' do
it 'does nothing' do
expect(MergeRequests::CleanupRefsService).not_to receive(:new)
perform_multiple(1)
worker.perform_work
end
end
end
context 'when merge request does not exist' do
it 'does not call MergeRequests::CleanupRefsService' do
context 'when there is no next cleanup schedule found' do
it 'does nothing' do
expect(MergeRequests::CleanupRefsService).not_to receive(:new)
perform_multiple(1)
worker.perform_work
end
end
end
describe '#remaining_work_count' do
let_it_be(:unstarted) { create_list(:merge_request_cleanup_schedule, 2) }
let_it_be(:running) { create_list(:merge_request_cleanup_schedule, 2, :running) }
let_it_be(:completed) { create_list(:merge_request_cleanup_schedule, 2, :completed) }
it 'returns number of scheduled and unstarted cleanup schedule records' do
expect(worker.remaining_work_count).to eq(unstarted.count)
end
context 'when count exceeds max_running_jobs' do
before do
create_list(:merge_request_cleanup_schedule, worker.max_running_jobs)
end
it 'gets capped at max_running_jobs' do
expect(worker.remaining_work_count).to eq(worker.max_running_jobs)
end
end
end
describe '#max_running_jobs' do
it 'returns the value of MAX_RUNNING_JOBS' do
expect(worker.max_running_jobs).to eq(described_class::MAX_RUNNING_JOBS)
end
end
def stub_cleanup_service(result)
expect_next_instance_of(MergeRequests::CleanupRefsService, cleanup_schedule.merge_request) do |svc|
expect(svc).to receive(:execute).and_return(result)
end
end
end
......@@ -6,16 +6,9 @@ RSpec.describe ScheduleMergeRequestCleanupRefsWorker do
subject(:worker) { described_class.new }
describe '#perform' do
before do
allow(MergeRequest::CleanupSchedule)
.to receive(:scheduled_merge_request_ids)
.with(described_class::LIMIT)
.and_return([1, 2, 3, 4])
end
it 'does nothing if the database is read-only' do
allow(Gitlab::Database).to receive(:read_only?).and_return(true)
expect(MergeRequestCleanupRefsWorker).not_to receive(:bulk_perform_in)
expect(MergeRequestCleanupRefsWorker).not_to receive(:perform_with_capacity)
worker.perform
end
......@@ -26,25 +19,17 @@ RSpec.describe ScheduleMergeRequestCleanupRefsWorker do
end
it 'does not schedule any merge request clean ups' do
expect(MergeRequestCleanupRefsWorker).not_to receive(:bulk_perform_in)
expect(MergeRequestCleanupRefsWorker).not_to receive(:perform_with_capacity)
worker.perform
end
end
include_examples 'an idempotent worker' do
it 'schedules MergeRequestCleanupRefsWorker to be performed by batch' do
expect(MergeRequestCleanupRefsWorker)
.to receive(:bulk_perform_in)
.with(
described_class::DELAY,
[[1], [2], [3], [4]],
batch_size: described_class::BATCH_SIZE
)
it 'schedules MergeRequestCleanupRefsWorker to be performed with capacity' do
expect(MergeRequestCleanupRefsWorker).to receive(:perform_with_capacity).twice
expect(worker).to receive(:log_extra_metadata_on_done).with(:merge_requests_count, 4)
worker.perform
subject
end
end
end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment