Respects Geo secondary node max capacity per shard

parent 45115cd0
...@@ -31,17 +31,19 @@ module Geo ...@@ -31,17 +31,19 @@ module Geo
update_jobs_in_progress update_jobs_in_progress
# This will stream the results one by one unless over_capacity?
# until there are no more results to fetch. # This will stream the results one by one
result = connection.get_result # until there are no more results to fetch.
break :complete if result.nil? result = connection.get_result
break :complete if result.nil?
result.check
result.each do |row| result.check
schedule_job(row['id']) result.each do |row|
schedule_job(row['id'])
end
else
sleep(1)
end end
sleep(1) if over_capacity?
end end
rescue => error rescue => error
reason = :error reason = :error
...@@ -93,12 +95,15 @@ module Geo ...@@ -93,12 +95,15 @@ module Geo
end end
def update_jobs_in_progress def update_jobs_in_progress
status = Gitlab::SidekiqStatus.job_status(scheduled_job_ids) job_ids = scheduled_job_ids
return if job_ids.empty?
# SidekiqStatus returns an array of booleans: true if the job is still running, false otherwise. # SidekiqStatus returns an array of booleans: true if the job is still running, false otherwise.
# For each entry, first use `zip` to make { job_id: 123 } -> [ { job_id: 123 }, bool ] # For each entry, first use `zip` to make { job_id: 123 } -> [ { job_id: 123 }, bool ]
# Next, filter out the jobs that have completed. # Next, filter out the jobs that have completed.
@scheduled_jobs = @scheduled_jobs.zip(status).map { |(job, running)| job if running }.compact @scheduled_jobs = Gitlab::SidekiqStatus.job_status(scheduled_job_ids).then do |status|
@scheduled_jobs.zip(status).map { |(job, running)| job if running }.compact
end
end end
def scheduled_job_ids def scheduled_job_ids
......
...@@ -90,10 +90,14 @@ describe Geo::Secondary::RepositoryBackfillWorker, :geo, :geo_fdw, :clean_gitlab ...@@ -90,10 +90,14 @@ describe Geo::Secondary::RepositoryBackfillWorker, :geo, :geo_fdw, :clean_gitlab
it 'respects Geo secondary node max capacity per shard' do it 'respects Geo secondary node max capacity per shard' do
stub_healthy_shards([shard_name, 'shard2', 'shard3', 'shard4', 'shard5']) stub_healthy_shards([shard_name, 'shard2', 'shard3', 'shard4', 'shard5'])
allow(Geo::ProjectSyncWorker).to receive(:perform_async).twice.and_return('jid-123') project_1 = create(:project)
create_list(:project, 2) project_2 = create(:project)
allow(Geo::ProjectSyncWorker).to receive(:perform_async).with(project_1.id, anything).and_return('jid-1')
expect(subject).to receive(:sleep).twice.and_call_original allow(Geo::ProjectSyncWorker).to receive(:perform_async).with(project_2.id, anything).and_return('jid-2')
allow(Gitlab::SidekiqStatus).to receive(:job_status).with(['jid-2']).and_return([true], [false])
allow(Gitlab::SidekiqStatus).to receive(:job_status).with(['jid-1']).and_return([false])
expect(subject).to receive(:sleep).once.and_call_original
subject.perform(shard_name) subject.perform(shard_name)
end end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment