Commit 64270139 authored by Sean McGivern's avatar Sean McGivern

Merge branch 'dm-schedule-mirror-updates-in-parallel' into 'master'

Schedule mirror updates in parallel

Closes #5034

See merge request gitlab-org/gitlab-ee!4720
parents f0add7bc 5bd51666
...@@ -137,6 +137,7 @@ ...@@ -137,6 +137,7 @@
- elastic_indexer - elastic_indexer
- export_csv - export_csv
- ldap_group_sync - ldap_group_sync
- project_import_schedule
- project_update_repository_storage - project_update_repository_storage
- rebase - rebase
- repository_update_mirror - repository_update_mirror
......
...@@ -75,6 +75,7 @@ ...@@ -75,6 +75,7 @@
- [repository_remove_remote, 1] - [repository_remove_remote, 1]
- [repository_update_mirror, 1] - [repository_update_mirror, 1]
- [repository_update_remote_mirror, 1] - [repository_update_remote_mirror, 1]
- [project_import_schedule, 1]
- [project_update_repository_storage, 1] - [project_update_repository_storage, 1]
- [admin_emails, 1] - [admin_emails, 1]
- [elastic_batch_project_indexer, 1] - [elastic_batch_project_indexer, 1]
......
...@@ -490,7 +490,7 @@ module EE ...@@ -490,7 +490,7 @@ module EE
def load_licensed_feature_available(feature) def load_licensed_feature_available(feature)
globally_available = License.feature_available?(feature) globally_available = License.feature_available?(feature)
if namespace && ::Gitlab::CurrentSettings.should_check_namespace_plan? if ::Gitlab::CurrentSettings.should_check_namespace_plan? && namespace
globally_available && globally_available &&
(public? && namespace.public? || namespace.feature_available_in_plan?(feature)) (public? && namespace.public? || namespace.feature_available_in_plan?(feature))
else else
......
class ProjectImportScheduleWorker
include ApplicationWorker
prepend WaitableWorker
def perform(project_id)
project = Project.find_by(id: project_id)
project&.import_schedule
end
end
...@@ -3,6 +3,7 @@ class UpdateAllMirrorsWorker ...@@ -3,6 +3,7 @@ class UpdateAllMirrorsWorker
include CronjobQueue include CronjobQueue
LEASE_TIMEOUT = 5.minutes LEASE_TIMEOUT = 5.minutes
SCHEDULE_WAIT_TIMEOUT = 4.minutes
LEASE_KEY = 'update_all_mirrors'.freeze LEASE_KEY = 'update_all_mirrors'.freeze
def perform def perform
...@@ -15,29 +16,31 @@ class UpdateAllMirrorsWorker ...@@ -15,29 +16,31 @@ class UpdateAllMirrorsWorker
end end
def schedule_mirrors! def schedule_mirrors!
capacity = batch_size = Gitlab::Mirror.available_capacity capacity = Gitlab::Mirror.available_capacity
# Ignore mirrors that become due for scheduling once work begins, so we # Ignore mirrors that become due for scheduling once work begins, so we
# can't end up in an infinite loop # can't end up in an infinite loop
now = Time.now now = Time.now
last = nil last = nil
all_project_ids = []
# Normally, this will complete in 1-2 batches. One batch will be added per
# `batch_size` unlicensed projects in the database.
while capacity > 0 while capacity > 0
projects = pull_mirrors_batch(freeze_at: now, batch_size: batch_size, offset_at: last) batch_size = [capacity * 2, 500].min
projects = pull_mirrors_batch(freeze_at: now, batch_size: batch_size, offset_at: last).to_a
break if projects.empty? break if projects.empty?
last = projects.last.mirror_data.next_execution_timestamp project_ids = projects.lazy.select(&:mirror?).take(capacity).map(&:id).force
capacity -= project_ids.length
all_project_ids.concat(project_ids)
projects.each do |project| # If fewer than `batch_size` projects were returned, we don't need to query again
next unless project.mirror? break if projects.length < batch_size
capacity -= 1 last = projects.last.mirror_data.next_execution_timestamp
project.import_schedule
break unless capacity > 0
end
end end
ProjectImportScheduleWorker.bulk_perform_and_wait(all_project_ids.map { |id| [id] }, timeout: SCHEDULE_WAIT_TIMEOUT.to_i)
end end
private private
...@@ -51,7 +54,11 @@ class UpdateAllMirrorsWorker ...@@ -51,7 +54,11 @@ class UpdateAllMirrorsWorker
end end
def pull_mirrors_batch(freeze_at:, batch_size:, offset_at: nil) def pull_mirrors_batch(freeze_at:, batch_size:, offset_at: nil)
relation = Project.mirrors_to_sync(freeze_at).reorder('project_mirror_data.next_execution_timestamp').limit(batch_size) relation = Project
.mirrors_to_sync(freeze_at)
.reorder('project_mirror_data.next_execution_timestamp')
.limit(batch_size)
.includes(:namespace) # Used by `project.mirror?`
relation = relation.where('next_execution_timestamp > ?', offset_at) if offset_at relation = relation.where('next_execution_timestamp > ?', offset_at) if offset_at
......
---
title: Schedule mirror updates in parallel
merge_request:
author:
type: changed
...@@ -13,7 +13,7 @@ describe UpdateAllMirrorsWorker do ...@@ -13,7 +13,7 @@ describe UpdateAllMirrorsWorker do
allow_any_instance_of(Gitlab::ExclusiveLease).to receive(:try_obtain).and_return(false) allow_any_instance_of(Gitlab::ExclusiveLease).to receive(:try_obtain).and_return(false)
expect(worker).not_to receive(:fail_stuck_mirrors!) expect(worker).not_to receive(:schedule_mirrors!)
worker.perform worker.perform
end end
...@@ -29,7 +29,9 @@ describe UpdateAllMirrorsWorker do ...@@ -29,7 +29,9 @@ describe UpdateAllMirrorsWorker do
def schedule_mirrors!(capacity:) def schedule_mirrors!(capacity:)
allow(Gitlab::Mirror).to receive_messages(available_capacity: capacity) allow(Gitlab::Mirror).to receive_messages(available_capacity: capacity)
Sidekiq::Testing.fake! do allow_any_instance_of(RepositoryImportWorker).to receive(:perform)
Sidekiq::Testing.inline! do
worker.schedule_mirrors! worker.schedule_mirrors!
end end
end end
...@@ -74,29 +76,82 @@ describe UpdateAllMirrorsWorker do ...@@ -74,29 +76,82 @@ describe UpdateAllMirrorsWorker do
allow(Gitlab).to receive_messages(com?: true) allow(Gitlab).to receive_messages(com?: true)
end end
let!(:unlicensed_project) { scheduled_mirror(at: 4.weeks.ago, licensed: false) } let!(:unlicensed_project1) { scheduled_mirror(at: 8.weeks.ago, licensed: false) }
let!(:earliest_project) { scheduled_mirror(at: 3.weeks.ago, licensed: true) } let!(:unlicensed_project2) { scheduled_mirror(at: 7.weeks.ago, licensed: false) }
let!(:latest_project) { scheduled_mirror(at: 2.weeks.ago, licensed: true) } let!(:licensed_project1) { scheduled_mirror(at: 6.weeks.ago, licensed: true) }
let!(:unlicensed_project3) { scheduled_mirror(at: 5.weeks.ago, licensed: false) }
let!(:licensed_project2) { scheduled_mirror(at: 4.weeks.ago, licensed: true) }
let!(:unlicensed_project4) { scheduled_mirror(at: 3.weeks.ago, licensed: false) }
let!(:licensed_project3) { scheduled_mirror(at: 1.week.ago, licensed: true) }
let(:unlicensed_projects) { [unlicensed_project1, unlicensed_project2, unlicensed_project3, unlicensed_project4] }
context 'when capacity is in excess' do
it "schedules all available mirrors" do
schedule_mirrors!(capacity: 4)
expect_import_scheduled(licensed_project1, licensed_project2, licensed_project3)
expect_import_not_scheduled(*unlicensed_projects)
end
it "schedules all available mirrors when capacity is in excess" do it 'requests as many batches as necessary' do
schedule_mirrors!(capacity: 3) # The first batch will only contain 3 licensed mirrors, but since we have
# fewer than 8 mirrors in total, there's no need to request another batch
expect(subject).to receive(:pull_mirrors_batch).with(hash_including(batch_size: 8)).and_call_original
expect_import_scheduled(earliest_project, latest_project) schedule_mirrors!(capacity: 4)
expect_import_not_scheduled(unlicensed_project) end
end end
it "schedules all available mirrors when capacity is sufficient" do context 'when capacity is exacly sufficient' do
schedule_mirrors!(capacity: 2) it "schedules all available mirrors" do
schedule_mirrors!(capacity: 3)
expect_import_scheduled(earliest_project, latest_project) expect_import_scheduled(licensed_project1, licensed_project2, licensed_project3)
expect_import_not_scheduled(unlicensed_project) expect_import_not_scheduled(*unlicensed_projects)
end
it 'requests as many batches as necessary' do
# The first batch will only contain 2 licensed mirrors, so we need to request another batch
expect(subject).to receive(:pull_mirrors_batch).with(hash_including(batch_size: 6)).ordered.and_call_original
expect(subject).to receive(:pull_mirrors_batch).with(hash_including(batch_size: 2)).ordered.and_call_original
schedule_mirrors!(capacity: 3)
end
end end
it 'schedules mirrors by next_execution_timestamp when capacity is insufficient' do context 'when capacity is insufficient' do
schedule_mirrors!(capacity: 1) it 'schedules mirrors by next_execution_timestamp' do
schedule_mirrors!(capacity: 2)
expect_import_scheduled(licensed_project1, licensed_project2)
expect_import_not_scheduled(*unlicensed_projects, licensed_project3)
end
it 'requests as many batches as necessary' do
# The first batch will only contain 1 licensed mirror, so we need to request another batch
expect(subject).to receive(:pull_mirrors_batch).with(hash_including(batch_size: 4)).ordered.and_call_original
expect(subject).to receive(:pull_mirrors_batch).with(hash_including(batch_size: 2)).ordered.and_call_original
schedule_mirrors!(capacity: 2)
end
end
context 'when capacity is insufficient and the first batch is empty' do
it 'schedules mirrors by next_execution_timestamp' do
schedule_mirrors!(capacity: 1)
expect_import_scheduled(licensed_project1)
expect_import_not_scheduled(*unlicensed_projects, licensed_project2, licensed_project3)
end
it 'requests as many batches as necessary' do
# The first batch will not contain any licensed mirrors, so we need to request another batch
expect(subject).to receive(:pull_mirrors_batch).with(hash_including(batch_size: 2)).ordered.and_call_original
expect(subject).to receive(:pull_mirrors_batch).with(hash_including(batch_size: 2)).ordered.and_call_original
expect_import_scheduled(earliest_project) schedule_mirrors!(capacity: 1)
expect_import_not_scheduled(unlicensed_project, latest_project) end
end end
end end
end end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment