Commit ef7fa8ed authored by Tiago Botelho's avatar Tiago Botelho

Improves handling of the mirror threshold

parent 95ead8b1
...@@ -5,6 +5,9 @@ class RepositoryUpdateMirrorWorker ...@@ -5,6 +5,9 @@ class RepositoryUpdateMirrorWorker
include Gitlab::ShellAdapter include Gitlab::ShellAdapter
include DedicatedSidekiqQueue include DedicatedSidekiqQueue
LEASE_KEY = 'repository_update_mirror_worker_start_scheduler'.freeze
LEASE_TIMEOUT = 2.seconds
# Retry not neccessary. It will try again at the next update interval. # Retry not neccessary. It will try again at the next update interval.
sidekiq_options retry: false, status_expiration: StuckImportJobsWorker::IMPORT_JOBS_EXPIRATION sidekiq_options retry: false, status_expiration: StuckImportJobsWorker::IMPORT_JOBS_EXPIRATION
...@@ -30,11 +33,17 @@ class RepositoryUpdateMirrorWorker ...@@ -30,11 +33,17 @@ class RepositoryUpdateMirrorWorker
fail_mirror(project, ex.message) fail_mirror(project, ex.message)
raise UpdateError, "#{ex.class}: #{ex.message}" raise UpdateError, "#{ex.class}: #{ex.message}"
ensure ensure
UpdateAllMirrorsWorker.perform_async if Gitlab::Mirror.threshold_reached? if !lease.exists? && Gitlab::Mirror.reschedule_immediately? && lease.try_obtain
UpdateAllMirrorsWorker.perform_async
end
end end
private private
def lease
@lease ||= ::Gitlab::ExclusiveLease.new(LEASE_KEY, timeout: LEASE_TIMEOUT)
end
def start_mirror(project) def start_mirror(project)
if project.import_start if project.import_start
Gitlab::Mirror.increment_metric(:mirrors_running, 'Mirrors running count') Gitlab::Mirror.increment_metric(:mirrors_running, 'Mirrors running count')
......
...@@ -51,12 +51,7 @@ class UpdateAllMirrorsWorker ...@@ -51,12 +51,7 @@ class UpdateAllMirrorsWorker
end end
def pull_mirrors_batch(freeze_at:, batch_size:, offset_at: nil) def pull_mirrors_batch(freeze_at:, batch_size:, offset_at: nil)
relation = Project relation = Project.mirrors_to_sync(freeze_at).reorder('project_mirror_data.next_execution_timestamp').limit(batch_size)
.mirror
.joins(:mirror_data)
.where("next_execution_timestamp <= ? AND import_status NOT IN ('scheduled', 'started')", freeze_at)
.reorder('project_mirror_data.next_execution_timestamp')
.limit(batch_size)
relation = relation.where('next_execution_timestamp > ?', offset_at) if offset_at relation = relation.where('next_execution_timestamp > ?', offset_at) if offset_at
......
---
title: Improves handling of the mirror threshold.
merge_request: 2671
author:
...@@ -40,6 +40,7 @@ module EE ...@@ -40,6 +40,7 @@ module EE
scope :with_shared_runners_limit_enabled, -> { with_shared_runners.non_public_only } scope :with_shared_runners_limit_enabled, -> { with_shared_runners.non_public_only }
scope :mirror, -> { where(mirror: true) } scope :mirror, -> { where(mirror: true) }
scope :mirrors_to_sync, ->(freeze_at) { mirror.joins(:mirror_data).without_import_status(:scheduled, :started).where("next_execution_timestamp <= ?", freeze_at) }
scope :with_remote_mirrors, -> { joins(:remote_mirrors).where(remote_mirrors: { enabled: true }).distinct } scope :with_remote_mirrors, -> { joins(:remote_mirrors).where(remote_mirrors: { enabled: true }).distinct }
scope :with_wiki_enabled, -> { with_feature_enabled(:wiki) } scope :with_wiki_enabled, -> { with_feature_enabled(:wiki) }
......
...@@ -24,8 +24,16 @@ module Gitlab ...@@ -24,8 +24,16 @@ module Gitlab
available_capacity <= 0 available_capacity <= 0
end end
def threshold_reached? def reschedule_immediately?
available_capacity >= capacity_threshold available_spots = available_capacity
return false if available_spots < capacity_threshold
# Only reschedule if we are able to completely fill up the available spots.
mirrors_ready_to_sync_count >= available_spots
end
def mirrors_ready_to_sync_count
Project.mirrors_to_sync(Time.now).count
end end
def available_capacity def available_capacity
......
...@@ -64,25 +64,59 @@ describe Gitlab::Mirror do ...@@ -64,25 +64,59 @@ describe Gitlab::Mirror do
end end
end end
describe '#threshold_reached?' do describe '#reschedule_immediately?' do
let(:mirror_capacity_threshold) { current_application_settings.mirror_capacity_threshold } let(:mirror_capacity_threshold) { current_application_settings.mirror_capacity_threshold }
it 'returns true if available capacity surpassed defined threshold' do context 'with number of mirrors to sync equal to the available capacity' do
expect(described_class).to receive(:available_capacity).and_return(mirror_capacity_threshold + 1) it 'returns true if available capacity surpassed defined threshold' do
available_capacity = mirror_capacity_threshold + 1
expect(described_class).to receive(:available_capacity).and_return(available_capacity)
expect(described_class).to receive(:mirrors_ready_to_sync_count).and_return(available_capacity)
expect(described_class.reschedule_immediately?).to eq(true)
end
it 'returns true if available capacity is equal to the defined threshold' do
expect(described_class).to receive(:available_capacity).and_return(mirror_capacity_threshold)
expect(described_class).to receive(:mirrors_ready_to_sync_count).and_return(mirror_capacity_threshold)
expect(described_class.reschedule_immediately?).to eq(true)
end
end
expect(described_class.threshold_reached?).to eq(true) context 'with number of mirrors to sync surpassing the available capacity' do
it 'returns true if available capacity surpassed defined threshold' do
available_capacity = mirror_capacity_threshold + 1
expect(described_class).to receive(:available_capacity).and_return(available_capacity)
expect(described_class).to receive(:mirrors_ready_to_sync_count).and_return(available_capacity + 1)
expect(described_class.reschedule_immediately?).to eq(true)
end
it 'returns true if available capacity is equal to the defined threshold' do
expect(described_class).to receive(:available_capacity).and_return(mirror_capacity_threshold)
expect(described_class).to receive(:mirrors_ready_to_sync_count).and_return(mirror_capacity_threshold + 1)
expect(described_class.reschedule_immediately?).to eq(true)
end
end end
it 'returns true if available capacity is equal to the defined threshold' do it 'returns false if mirrors ready to sync is below the available capacity' do
expect(described_class).to receive(:available_capacity).and_return(mirror_capacity_threshold) expect(described_class).to receive(:available_capacity).and_return(mirror_capacity_threshold + 1)
expect(described_class).to receive(:mirrors_ready_to_sync_count).and_return(mirror_capacity_threshold)
expect(described_class.threshold_reached?).to eq(true) expect(described_class.reschedule_immediately?).to eq(false)
end end
it 'returns false if available capacity is below the defined threshold' do it 'returns false if available capacity is below the defined threshold' do
expect(described_class).to receive(:available_capacity).and_return(mirror_capacity_threshold - 1) available_capacity = mirror_capacity_threshold - 1
expect(described_class).to receive(:available_capacity).and_return(available_capacity)
expect(described_class).not_to receive(:mirrors_ready_to_sync_count)
expect(described_class.threshold_reached?).to eq(false) expect(described_class.reschedule_immediately?).to eq(false)
end end
after do after do
......
require 'rails_helper' require 'rails_helper'
describe RepositoryUpdateMirrorWorker do describe RepositoryUpdateMirrorWorker do
subject { described_class.new }
describe '#perform' do describe '#perform' do
context 'with status none' do let!(:project) { create(:project, :mirror, :import_scheduled) }
let(:project) { create(:project, :mirror, :import_scheduled) }
it 'sets status as finished when update mirror service executes successfully' do before do
expect_any_instance_of(Projects::UpdateMirrorService).to receive(:execute).and_return(status: :success) allow_any_instance_of(Gitlab::ExclusiveLease).to receive(:try_obtain).and_return(true)
end
expect { subject.perform(project.id) }.to change { project.reload.import_status }.to('finished') it 'sets status as finished when update mirror service executes successfully' do
end expect_any_instance_of(Projects::UpdateMirrorService).to receive(:execute).and_return(status: :success)
it 'sets status as failed when update mirror service executes with errors' do expect { subject.perform(project.id) }.to change { project.reload.import_status }.to('finished')
error_message = 'fail!' end
expect_any_instance_of(Projects::UpdateMirrorService).to receive(:execute).and_return(status: :error, message: error_message) it 'sets status as failed when update mirror service executes with errors' do
allow_any_instance_of(Projects::UpdateMirrorService).to receive(:execute).and_return(status: :error, message: 'error')
expect do expect { subject.perform(project.id) }.to raise_error(RepositoryUpdateMirrorWorker::UpdateError, 'error')
subject.perform(project.id) expect(project.reload.import_status).to eq('failed')
end.to raise_error(RepositoryUpdateMirrorWorker::UpdateError, error_message)
expect(project.reload.import_status).to eq('failed')
end
end end
context 'with another worker already running' do context 'with another worker already running' do
...@@ -33,45 +29,55 @@ describe RepositoryUpdateMirrorWorker do ...@@ -33,45 +29,55 @@ describe RepositoryUpdateMirrorWorker do
end end
end end
context 'with unexpected error' do it 'marks mirror as failed when an error occurs' do
it 'marks mirror as failed' do allow_any_instance_of(Projects::UpdateMirrorService).to receive(:execute).and_raise(RuntimeError)
mirror = create(:project, :repository, :mirror, :import_scheduled)
allow_any_instance_of(Projects::UpdateMirrorService).to receive(:execute).and_raise(RuntimeError) expect { subject.perform(project.id) }.to raise_error(RepositoryUpdateMirrorWorker::UpdateError)
expect(project.reload.import_status).to eq('failed')
expect do
subject.perform(mirror.id)
end.to raise_error(RepositoryUpdateMirrorWorker::UpdateError)
expect(mirror.reload.import_status).to eq('failed')
end
end end
context 'threshold_reached?' do context 'reschedule mirrors' do
let(:mirror) { create(:project, :repository, :mirror, :import_scheduled) }
before do before do
expect_any_instance_of(Projects::UpdateMirrorService).to receive(:execute).and_return(status: :success) allow_any_instance_of(Projects::UpdateMirrorService).to receive(:execute).and_return(status: :success)
end end
context 'with threshold_reached? true' do context 'when we obtain the lease' do
it 'schedules UpdateAllMirrorsWorker' do before do
expect(Gitlab::Mirror).to receive(:threshold_reached?).and_return(true) allow_any_instance_of(Gitlab::ExclusiveLease).to receive(:try_obtain).and_return(true)
end
it 'performs UpdateAllMirrorsWorker when reschedule_immediately? returns true' do
allow(Gitlab::Mirror).to receive(:reschedule_immediately?).and_return(true)
expect(UpdateAllMirrorsWorker).to receive(:perform_async) expect(UpdateAllMirrorsWorker).to receive(:perform_async).once
subject.perform(mirror.id) subject.perform(project.id)
end end
end
context 'with threshold_reached? false' do it 'does not perform UpdateAllMirrorsWorker when reschedule_immediately? returns false' do
it 'does not schedule UpdateAllMirrorsWorker' do allow(Gitlab::Mirror).to receive(:reschedule_immediately?).and_return(false)
expect(Gitlab::Mirror).to receive(:threshold_reached?).and_return(false)
expect(UpdateAllMirrorsWorker).not_to receive(:perform_async) expect(UpdateAllMirrorsWorker).not_to receive(:perform_async)
subject.perform(mirror.id) subject.perform(project.id)
end end
end end
it 'does not perform UpdateAllMirrorsWorker when we cannot obtain the lease' do
allow_any_instance_of(Gitlab::ExclusiveLease).to receive(:try_obtain).and_return(false)
expect(UpdateAllMirrorsWorker).not_to receive(:perform_async)
subject.perform(project.id)
end
it 'does not perform UpdateAllMirrorsWorker when the lease already exists' do
allow_any_instance_of(Gitlab::ExclusiveLease).to receive(:exists?).and_return(true)
expect(UpdateAllMirrorsWorker).not_to receive(:perform_async)
subject.perform(project.id)
end
end end
end end
end end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment