Commit a38ecbc1 authored by Quang-Minh Nguyen's avatar Quang-Minh Nguyen

Make repository pull mirroring not depend on sidekiq queue sizes

Issue: https://gitlab.com/gitlab-org/gitlab/-/issues/340630
parent d75114c8
...@@ -4,6 +4,10 @@ class ProjectImportScheduleWorker ...@@ -4,6 +4,10 @@ class ProjectImportScheduleWorker
ImportStateNotFound = Class.new(StandardError) ImportStateNotFound = Class.new(StandardError)
include ApplicationWorker include ApplicationWorker
# At the moment, this inclusion is to enable job tracking ability. In the
# future, the capacity management should be moved to this worker instead of
# UpdateAllMirrorsWorker
include LimitedCapacity::Worker
data_consistency :always data_consistency :always
prepend WaitableWorker prepend WaitableWorker
...@@ -21,6 +25,8 @@ class ProjectImportScheduleWorker ...@@ -21,6 +25,8 @@ class ProjectImportScheduleWorker
tags :needs_own_queue tags :needs_own_queue
def perform(project_id) def perform(project_id)
job_tracker.register(jid, capacity)
return if Gitlab::Database.read_only? return if Gitlab::Database.read_only?
project = Project.with_route.with_import_state.with_namespace.find_by_id(project_id) project = Project.with_route.with_import_state.with_namespace.find_by_id(project_id)
...@@ -29,5 +35,17 @@ class ProjectImportScheduleWorker ...@@ -29,5 +35,17 @@ class ProjectImportScheduleWorker
with_context(project: project) do with_context(project: project) do
project.import_state.schedule project.import_state.schedule
end end
ensure
job_tracker.remove(jid)
end
private
def capacity
Gitlab::Mirror.available_capacity
end
def job_tracker
@job_tracker ||= LimitedCapacity::JobTracker.new(self.class.name)
end end
end end
...@@ -18,7 +18,16 @@ class UpdateAllMirrorsWorker # rubocop:disable Scalability/IdempotentWorker ...@@ -18,7 +18,16 @@ class UpdateAllMirrorsWorker # rubocop:disable Scalability/IdempotentWorker
scheduled = 0 scheduled = 0
with_lease do with_lease do
# Clean-up completed jobs with stale status
job_tracker.clean_up
scheduled = schedule_mirrors! scheduled = schedule_mirrors!
if scheduled > 0
# Wait for all ProjectImportScheduleWorker jobs to complete
deadline = Time.current + SCHEDULE_WAIT_TIMEOUT
sleep 1 while job_tracker.count > 0 && Time.current < deadline
end
end end
# If we didn't get the lease, or no updates were scheduled, exit early # If we didn't get the lease, or no updates were scheduled, exit early
...@@ -73,12 +82,6 @@ class UpdateAllMirrorsWorker # rubocop:disable Scalability/IdempotentWorker ...@@ -73,12 +82,6 @@ class UpdateAllMirrorsWorker # rubocop:disable Scalability/IdempotentWorker
last = projects.last.import_state.next_execution_timestamp last = projects.last.import_state.next_execution_timestamp
end end
if scheduled > 0
# Wait for all ProjectImportScheduleWorker jobs to complete
deadline = Time.current + SCHEDULE_WAIT_TIMEOUT
sleep 1 while ProjectImportScheduleWorker.queue_size > 0 && Time.current < deadline
end
scheduled scheduled
end end
# rubocop: enable CodeReuse/ActiveRecord # rubocop: enable CodeReuse/ActiveRecord
...@@ -156,4 +159,8 @@ class UpdateAllMirrorsWorker # rubocop:disable Scalability/IdempotentWorker ...@@ -156,4 +159,8 @@ class UpdateAllMirrorsWorker # rubocop:disable Scalability/IdempotentWorker
end.to_sql end.to_sql
end end
# rubocop: enable CodeReuse/ActiveRecord # rubocop: enable CodeReuse/ActiveRecord
def job_tracker
@job_tracker ||= LimitedCapacity::JobTracker.new(ProjectImportScheduleWorker.name)
end
end end
...@@ -11,7 +11,14 @@ RSpec.describe ProjectImportScheduleWorker do ...@@ -11,7 +11,14 @@ RSpec.describe ProjectImportScheduleWorker do
let(:job_args) { [project.id] } let(:job_args) { [project.id] }
let(:job_tracker_instance) { double(LimitedCapacity::JobTracker) }
before do before do
allow(Gitlab::Mirror).to receive(:available_capacity).and_return(5)
allow(LimitedCapacity::JobTracker).to receive(:new).with('ProjectImportScheduleWorker').and_return(job_tracker_instance)
allow(job_tracker_instance).to receive(:register)
allow(job_tracker_instance).to receive(:remove)
allow(Project).to receive(:find_by_id).with(project.id).and_return(project) allow(Project).to receive(:find_by_id).with(project.id).and_return(project)
allow(project).to receive(:add_import_job) allow(project).to receive(:add_import_job)
end end
...@@ -31,6 +38,13 @@ RSpec.describe ProjectImportScheduleWorker do ...@@ -31,6 +38,13 @@ RSpec.describe ProjectImportScheduleWorker do
expect(import_state).to be_scheduled expect(import_state).to be_scheduled
end end
it 'tracks the status of the worker' do
subject
expect(job_tracker_instance).to have_received(:register).with(any_args, 5).at_least(:once)
expect(job_tracker_instance).to have_received(:remove).with(any_args).at_least(:once)
end
end end
context 'project is not found' do context 'project is not found' do
......
...@@ -56,8 +56,17 @@ RSpec.describe UpdateAllMirrorsWorker do ...@@ -56,8 +56,17 @@ RSpec.describe UpdateAllMirrorsWorker do
end end
context 'when updates were scheduled' do context 'when updates were scheduled' do
let(:job_tracker_instance) { double(LimitedCapacity::JobTracker) }
before do before do
allow(worker).to receive(:schedule_mirrors!).and_return(1) allow(worker).to receive(:schedule_mirrors!).and_return(1)
allow(LimitedCapacity::JobTracker).to receive(:new).with('ProjectImportScheduleWorker').and_return(job_tracker_instance)
count = 3
allow(job_tracker_instance).to receive(:clean_up)
allow(job_tracker_instance).to receive(:register)
allow(job_tracker_instance).to receive(:remove)
allow(job_tracker_instance).to receive(:count) { |_| count -= 1 }
end end
it 'sleeps a bit after scheduling mirrors' do it 'sleeps a bit after scheduling mirrors' do
...@@ -66,6 +75,18 @@ RSpec.describe UpdateAllMirrorsWorker do ...@@ -66,6 +75,18 @@ RSpec.describe UpdateAllMirrorsWorker do
worker.perform worker.perform
end end
it 'cleans up finished ProjectImportSchduleWorker jobs' do
worker.perform
expect(job_tracker_instance).to have_received(:clean_up).once
end
it 'waits until all ProjectImportSchduleWorker jobs to complete' do
worker.perform
expect(job_tracker_instance).to have_received(:count).exactly(3).times
end
context 'if capacity is available' do context 'if capacity is available' do
before do before do
allow(Gitlab::Mirror).to receive(:reschedule_immediately?).and_return(true) allow(Gitlab::Mirror).to receive(:reschedule_immediately?).and_return(true)
...@@ -102,6 +123,20 @@ RSpec.describe UpdateAllMirrorsWorker do ...@@ -102,6 +123,20 @@ RSpec.describe UpdateAllMirrorsWorker do
worker.perform worker.perform
end end
it 'does not poll for ProjectImportSchduleWorker jobs to complete' do
expect_next_instance_of(LimitedCapacity::JobTracker) do |instance|
expect(instance).not_to receive(:count)
end
worker.perform
end
it 'does not wait' do
expect(Kernel).not_to receive(:sleep)
worker.perform
end
end end
end end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment