Commit ebc5615a authored by Robert Speicher's avatar Robert Speicher

Merge branch 'tc-repo-check-per-shard-ee' into 'master'

Repo check per shard (EE port)

Closes gitlab-ce#48042

See merge request gitlab-org/gitlab-ee!6287
parents 648056b8 6ce06be9
...@@ -11,7 +11,7 @@ ...@@ -11,7 +11,7 @@
- cronjob:remove_old_web_hook_logs - cronjob:remove_old_web_hook_logs
- cronjob:remove_unreferenced_lfs_objects - cronjob:remove_unreferenced_lfs_objects
- cronjob:repository_archive_cache - cronjob:repository_archive_cache
- cronjob:repository_check_batch - cronjob:repository_check_dispatch
- cronjob:requests_profiles - cronjob:requests_profiles
- cronjob:schedule_update_user_activity - cronjob:schedule_update_user_activity
- cronjob:stuck_ci_jobs - cronjob:stuck_ci_jobs
...@@ -71,6 +71,7 @@ ...@@ -71,6 +71,7 @@
- pipeline_processing:update_head_pipeline_for_merge_request - pipeline_processing:update_head_pipeline_for_merge_request
- repository_check:repository_check_clear - repository_check:repository_check_clear
- repository_check:repository_check_batch
- repository_check:repository_check_single_repository - repository_check:repository_check_single_repository
- default - default
......
module EachShardWorker
extend ActiveSupport::Concern
include ::Gitlab::Utils::StrongMemoize
def each_eligible_shard
Gitlab::ShardHealthCache.update(eligible_shard_names)
eligible_shard_names.each do |shard_name|
yield shard_name
end
end
# override when you want to filter out some shards
def eligible_shard_names
healthy_shard_names
end
def healthy_shard_names
strong_memoize(:healthy_shard_names) do
healthy_ready_shards.map { |result| result.labels[:shard] }
end
end
def healthy_ready_shards
ready_shards.select(&:success)
end
def ready_shards
Gitlab::HealthChecks::GitalyCheck.readiness
end
end
...@@ -3,13 +3,18 @@ module RepositoryCheck ...@@ -3,13 +3,18 @@ module RepositoryCheck
prepend ::EE::RepositoryCheck::BatchWorker prepend ::EE::RepositoryCheck::BatchWorker
include ApplicationWorker include ApplicationWorker
include CronjobQueue include RepositoryCheckQueue
RUN_TIME = 3600 RUN_TIME = 3600
BATCH_SIZE = 10_000 BATCH_SIZE = 10_000
def perform attr_reader :shard_name
def perform(shard_name)
@shard_name = shard_name
return unless Gitlab::CurrentSettings.repository_checks_enabled return unless Gitlab::CurrentSettings.repository_checks_enabled
return unless Gitlab::ShardHealthCache.healthy_shard?(shard_name)
start = Time.now start = Time.now
...@@ -39,18 +44,22 @@ module RepositoryCheck ...@@ -39,18 +44,22 @@ module RepositoryCheck
end end
def never_checked_project_ids(batch_size) def never_checked_project_ids(batch_size)
Project.where(last_repository_check_at: nil) projects_on_shard.where(last_repository_check_at: nil)
.where('created_at < ?', 24.hours.ago) .where('created_at < ?', 24.hours.ago)
.limit(batch_size).pluck(:id) .limit(batch_size).pluck(:id)
end end
def old_checked_project_ids(batch_size) def old_checked_project_ids(batch_size)
Project.where.not(last_repository_check_at: nil) projects_on_shard.where.not(last_repository_check_at: nil)
.where('last_repository_check_at < ?', 1.month.ago) .where('last_repository_check_at < ?', 1.month.ago)
.reorder(last_repository_check_at: :asc) .reorder(last_repository_check_at: :asc)
.limit(batch_size).pluck(:id) .limit(batch_size).pluck(:id)
end end
def projects_on_shard
Project.where(repository_storage: shard_name)
end
def try_obtain_lease(id) def try_obtain_lease(id)
# Use a 24-hour timeout because on servers/projects where 'git fsck' is # Use a 24-hour timeout because on servers/projects where 'git fsck' is
# super slow we definitely do not want to run it twice in parallel. # super slow we definitely do not want to run it twice in parallel.
......
module RepositoryCheck
class DispatchWorker
include ApplicationWorker
include CronjobQueue
include ::EachShardWorker
def perform
return unless Gitlab::CurrentSettings.repository_checks_enabled
each_eligible_shard do |shard_name|
RepositoryCheck::BatchWorker.perform_async(shard_name)
end
end
end
end
---
title: Run repository checks in parallel for each shard
merge_request: 20179
author:
type: added
...@@ -301,7 +301,7 @@ Settings.cron_jobs['expire_build_artifacts_worker']['cron'] ||= '50 * * * *' ...@@ -301,7 +301,7 @@ Settings.cron_jobs['expire_build_artifacts_worker']['cron'] ||= '50 * * * *'
Settings.cron_jobs['expire_build_artifacts_worker']['job_class'] = 'ExpireBuildArtifactsWorker' Settings.cron_jobs['expire_build_artifacts_worker']['job_class'] = 'ExpireBuildArtifactsWorker'
Settings.cron_jobs['repository_check_worker'] ||= Settingslogic.new({}) Settings.cron_jobs['repository_check_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['repository_check_worker']['cron'] ||= '20 * * * *' Settings.cron_jobs['repository_check_worker']['cron'] ||= '20 * * * *'
Settings.cron_jobs['repository_check_worker']['job_class'] = 'RepositoryCheck::BatchWorker' Settings.cron_jobs['repository_check_worker']['job_class'] = 'RepositoryCheck::DispatchWorker'
Settings.cron_jobs['admin_email_worker'] ||= Settingslogic.new({}) Settings.cron_jobs['admin_email_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['admin_email_worker']['cron'] ||= '0 0 * * 0' Settings.cron_jobs['admin_email_worker']['cron'] ||= '0 0 * * 0'
Settings.cron_jobs['admin_email_worker']['job_class'] = 'AdminEmailWorker' Settings.cron_jobs['admin_email_worker']['job_class'] = 'AdminEmailWorker'
......
...@@ -22,7 +22,7 @@ module Geo ...@@ -22,7 +22,7 @@ module Geo
end end
shard_name = project.repository_storage shard_name = project.repository_storage
unless Gitlab::Geo::ShardHealthCache.healthy_shard?(shard_name) unless Gitlab::ShardHealthCache.healthy_shard?(shard_name)
log_error("Project shard '#{shard_name}' is unhealthy, skipping syncing", project_id: project_id) log_error("Project shard '#{shard_name}' is unhealthy, skipping syncing", project_id: project_id)
return return
end end
......
...@@ -7,7 +7,7 @@ module Geo ...@@ -7,7 +7,7 @@ module Geo
def perform(shard_name) def perform(shard_name)
@shard_name = shard_name @shard_name = shard_name
return unless Gitlab::Geo::ShardHealthCache.healthy_shard?(shard_name) return unless Gitlab::ShardHealthCache.healthy_shard?(shard_name)
super() super()
end end
...@@ -24,7 +24,7 @@ module Geo ...@@ -24,7 +24,7 @@ module Geo
end end
def max_capacity def max_capacity
healthy_count = Gitlab::Geo::ShardHealthCache.healthy_shard_count healthy_count = Gitlab::ShardHealthCache.healthy_shard_count
# If we don't have a count, that means that for some reason # If we don't have a count, that means that for some reason
# RepositorySyncWorker stopped running/updating the cache. We might # RepositorySyncWorker stopped running/updating the cache. We might
......
...@@ -9,7 +9,7 @@ module Geo ...@@ -9,7 +9,7 @@ module Geo
def perform(shard_name) def perform(shard_name)
@shard_name = shard_name @shard_name = shard_name
return unless Gitlab::Geo::ShardHealthCache.healthy_shard?(shard_name) return unless Gitlab::ShardHealthCache.healthy_shard?(shard_name)
super() super()
end end
......
...@@ -9,7 +9,7 @@ module Geo ...@@ -9,7 +9,7 @@ module Geo
def perform(shard_name) def perform(shard_name)
@shard_name = shard_name @shard_name = shard_name
return unless Gitlab::Geo::ShardHealthCache.healthy_shard?(shard_name) return unless Gitlab::ShardHealthCache.healthy_shard?(shard_name)
super() super()
end end
......
...@@ -3,43 +3,16 @@ module Geo ...@@ -3,43 +3,16 @@ module Geo
class PerShardSchedulerWorker class PerShardSchedulerWorker
include ApplicationWorker include ApplicationWorker
include CronjobQueue include CronjobQueue
include ::Gitlab::Utils::StrongMemoize
include ::Gitlab::Geo::LogHelpers include ::Gitlab::Geo::LogHelpers
include ::EachShardWorker
HEALTHY_SHARD_CHECKS = [
Gitlab::HealthChecks::GitalyCheck
].freeze
def perform def perform
Gitlab::Geo::ShardHealthCache.update(eligible_shard_names) each_eligible_shard { |shard_name| schedule_job(shard_name) }
eligible_shard_names.each { |shard_name| schedule_job(shard_name) }
end
def eligible_shard_names
healthy_shard_names
end end
def schedule_job(shard_name) def schedule_job(shard_name)
raise NotImplementedError raise NotImplementedError
end end
def healthy_shard_names
strong_memoize(:healthy_shard_names) do
# For now, we need to perform both Gitaly and direct filesystem checks to ensure
# the shard is healthy. We take the intersection of the successful checks
# as the healthy shards.
healthy_ready_shards.map { |result| result.labels[:shard] }.compact.uniq
end
end
def ready_shards
HEALTHY_SHARD_CHECKS.map(&:readiness)
end
def healthy_ready_shards
ready_shards.map { |result| result.select(&:success) }.inject(:&)
end
end end
end end
end end
...@@ -26,7 +26,7 @@ module Gitlab ...@@ -26,7 +26,7 @@ module Gitlab
def healthy_shard_for?(event) def healthy_shard_for?(event)
return true unless event.respond_to?(:project) return true unless event.respond_to?(:project)
Gitlab::Geo::ShardHealthCache.healthy_shard?(event.project.repository_storage) Gitlab::ShardHealthCache.healthy_shard?(event.project.repository_storage)
end end
def enqueue_job_if_shard_healthy(event) def enqueue_job_if_shard_healthy(event)
......
module Gitlab
module Geo
class ShardHealthCache
HEALTHY_SHARDS_KEY = 'gitlab-geo-healthy-shards'.freeze
HEALTHY_SHARDS_TIMEOUT = 300
# Clears the Redis set storing the list of healthy shards
def self.clear
Gitlab::Redis::Cache.with { |redis| redis.del(HEALTHY_SHARDS_KEY) }
end
# Updates the list of healthy shards using a Redis set
#
# shards - An array of shard names to store
def self.update(shards)
Gitlab::Redis::Cache.with do |redis|
redis.multi do |m|
m.del(HEALTHY_SHARDS_KEY)
shards.each { |shard_name| m.sadd(HEALTHY_SHARDS_KEY, shard_name) }
m.expire(HEALTHY_SHARDS_KEY, HEALTHY_SHARDS_TIMEOUT)
end
end
end
# Returns an array of strings of healthy shards
def self.cached_healthy_shards
Gitlab::Redis::Cache.with { |redis| redis.smembers(HEALTHY_SHARDS_KEY) }
end
# Checks whether the given shard name is in the list of healthy shards.
#
# shard_name - The string to check
def self.healthy_shard?(shard_name)
Gitlab::Redis::Cache.with { |redis| redis.sismember(HEALTHY_SHARDS_KEY, shard_name) }
end
# Returns the number of healthy shards in the Redis set
def self.healthy_shard_count
Gitlab::Redis::Cache.with { |redis| redis.scard(HEALTHY_SHARDS_KEY) }
end
end
end
end
...@@ -90,7 +90,7 @@ describe Gitlab::Geo::LogCursor::Daemon, :postgresql, :clean_gitlab_redis_shared ...@@ -90,7 +90,7 @@ describe Gitlab::Geo::LogCursor::Daemon, :postgresql, :clean_gitlab_redis_shared
let!(:registry) { create(:geo_project_registry, :synced, project: project) } let!(:registry) { create(:geo_project_registry, :synced, project: project) }
before do before do
allow(Gitlab::Geo::ShardHealthCache).to receive(:healthy_shard?).with('default').and_return(true) allow(Gitlab::ShardHealthCache).to receive(:healthy_shard?).with('default').and_return(true)
end end
it 'replays events for projects that belong to selected namespaces to replicate' do it 'replays events for projects that belong to selected namespaces to replicate' do
......
...@@ -37,7 +37,7 @@ describe Gitlab::Geo::LogCursor::Events::RepositoryCreatedEvent, :postgresql, :c ...@@ -37,7 +37,7 @@ describe Gitlab::Geo::LogCursor::Events::RepositoryCreatedEvent, :postgresql, :c
describe '#process' do describe '#process' do
before do before do
allow(Gitlab::Geo::ShardHealthCache).to receive(:healthy_shard?).with('default').and_return(healthy) allow(Gitlab::ShardHealthCache).to receive(:healthy_shard?).with('default').and_return(healthy)
end end
context 'when the associated shard is healthy' do context 'when the associated shard is healthy' do
......
...@@ -18,7 +18,7 @@ describe Gitlab::Geo::LogCursor::Events::RepositoryUpdatedEvent, :postgresql, :c ...@@ -18,7 +18,7 @@ describe Gitlab::Geo::LogCursor::Events::RepositoryUpdatedEvent, :postgresql, :c
before do before do
stub_current_geo_node(secondary) stub_current_geo_node(secondary)
allow(Gitlab::Geo::ShardHealthCache).to receive(:healthy_shard?).with('broken').and_return(false) allow(Gitlab::ShardHealthCache).to receive(:healthy_shard?).with('broken').and_return(false)
end end
RSpec.shared_examples 'RepositoryUpdatedEvent' do RSpec.shared_examples 'RepositoryUpdatedEvent' do
...@@ -81,7 +81,7 @@ describe Gitlab::Geo::LogCursor::Events::RepositoryUpdatedEvent, :postgresql, :c ...@@ -81,7 +81,7 @@ describe Gitlab::Geo::LogCursor::Events::RepositoryUpdatedEvent, :postgresql, :c
let(:now) { Time.now } let(:now) { Time.now }
before do before do
allow(Gitlab::Geo::ShardHealthCache).to receive(:healthy_shard?).with('default').and_return(healthy) allow(Gitlab::ShardHealthCache).to receive(:healthy_shard?).with('default').and_return(healthy)
end end
context 'when the associated shard is healthy' do context 'when the associated shard is healthy' do
......
...@@ -3,8 +3,13 @@ require 'spec_helper' ...@@ -3,8 +3,13 @@ require 'spec_helper'
describe EE::RepositoryCheck::BatchWorker do describe EE::RepositoryCheck::BatchWorker do
include ::EE::GeoHelpers include ::EE::GeoHelpers
let(:shard_name) { 'default' }
subject(:worker) { RepositoryCheck::BatchWorker.new } subject(:worker) { RepositoryCheck::BatchWorker.new }
before do
Gitlab::ShardHealthCache.update([shard_name])
end
context 'Geo primary' do context 'Geo primary' do
set(:primary) { create(:geo_node, :primary) } set(:primary) { create(:geo_node, :primary) }
...@@ -13,9 +18,9 @@ describe EE::RepositoryCheck::BatchWorker do ...@@ -13,9 +18,9 @@ describe EE::RepositoryCheck::BatchWorker do
end end
it 'loads project ids from main database' do it 'loads project ids from main database' do
projects = create_list(:project, 3, created_at: 1.week.ago) projects = create_list(:project, 3, created_at: 1.week.ago, repository_storage: shard_name)
expect(worker.perform).to eq(projects.map(&:id)) expect(worker.perform(shard_name)).to match_array(projects.map(&:id))
end end
end end
...@@ -28,16 +33,24 @@ describe EE::RepositoryCheck::BatchWorker do ...@@ -28,16 +33,24 @@ describe EE::RepositoryCheck::BatchWorker do
it 'loads project ids from tracking database' do it 'loads project ids from tracking database' do
project_registries = create_list(:geo_project_registry, 3, :synced) project_registries = create_list(:geo_project_registry, 3, :synced)
update_project_registry_shard(project_registries, shard_name)
expect(worker.perform).to eq(project_registries.map(&:project_id)) expect(worker.perform(shard_name)).to match_array(project_registries.map(&:project_id))
end end
it 'loads project ids that were checked more than a month ago from tracking database' do it 'loads project ids that were checked more than a month ago from tracking database' do
project_registries = create_list(:geo_project_registry, 3, :synced, project_registries = create_list(:geo_project_registry, 3, :synced,
last_repository_check_failed: false, last_repository_check_failed: false,
last_repository_check_at: 42.days.ago) last_repository_check_at: 42.days.ago)
update_project_registry_shard(project_registries, shard_name)
expect(worker.perform(shard_name)).to match_array(project_registries.map(&:project_id))
end
end
expect(worker.perform).to eq(project_registries.map(&:project_id)) def update_project_registry_shard(project_registries, shard_name)
project_registries.each do |registry|
Project.find(registry.project_id).update_column(:repository_storage, shard_name)
end end
end end
end end
...@@ -8,10 +8,10 @@ RSpec.describe Geo::ProjectSyncWorker do ...@@ -8,10 +8,10 @@ RSpec.describe Geo::ProjectSyncWorker do
let(:wiki_sync_service) { spy } let(:wiki_sync_service) { spy }
before do before do
allow(Gitlab::Geo::ShardHealthCache).to receive(:healthy_shard?) allow(Gitlab::ShardHealthCache).to receive(:healthy_shard?)
.with(project.repository_storage).once.and_return(true) .with(project.repository_storage).once.and_return(true)
allow(Gitlab::Geo::ShardHealthCache).to receive(:healthy_shard?) allow(Gitlab::ShardHealthCache).to receive(:healthy_shard?)
.with(project_with_broken_storage.repository_storage).once.and_return(false) .with(project_with_broken_storage.repository_storage).once.and_return(false)
allow(Geo::RepositorySyncService).to receive(:new) allow(Geo::RepositorySyncService).to receive(:new)
......
...@@ -28,7 +28,7 @@ describe Geo::RepositoryShardSyncWorker, :geo, :delete, :clean_gitlab_redis_cach ...@@ -28,7 +28,7 @@ describe Geo::RepositoryShardSyncWorker, :geo, :delete, :clean_gitlab_redis_cach
allow_any_instance_of(Gitlab::ExclusiveLease).to receive(:try_obtain) { true } allow_any_instance_of(Gitlab::ExclusiveLease).to receive(:try_obtain) { true }
allow_any_instance_of(Gitlab::ExclusiveLease).to receive(:renew) { true } allow_any_instance_of(Gitlab::ExclusiveLease).to receive(:renew) { true }
Gitlab::Geo::ShardHealthCache.update([shard_name]) Gitlab::ShardHealthCache.update([shard_name])
end end
it 'performs Geo::ProjectSyncWorker for each project' do it 'performs Geo::ProjectSyncWorker for each project' do
...@@ -47,8 +47,7 @@ describe Geo::RepositoryShardSyncWorker, :geo, :delete, :clean_gitlab_redis_cach ...@@ -47,8 +47,7 @@ describe Geo::RepositoryShardSyncWorker, :geo, :delete, :clean_gitlab_redis_cach
end end
it 'does not perform Geo::ProjectSyncWorker when shard becomes unhealthy' do it 'does not perform Geo::ProjectSyncWorker when shard becomes unhealthy' do
Gitlab::Geo::ShardHealthCache.update([]) Gitlab::ShardHealthCache.update([])
expect(Geo::ProjectSyncWorker).not_to receive(:perform_async) expect(Geo::ProjectSyncWorker).not_to receive(:perform_async)
subject.perform(shard_name) subject.perform(shard_name)
...@@ -108,7 +107,7 @@ describe Geo::RepositoryShardSyncWorker, :geo, :delete, :clean_gitlab_redis_cach ...@@ -108,7 +107,7 @@ describe Geo::RepositoryShardSyncWorker, :geo, :delete, :clean_gitlab_redis_cach
it 'uses two loops to schedule jobs' do it 'uses two loops to schedule jobs' do
expect(subject).to receive(:schedule_jobs).twice.and_call_original expect(subject).to receive(:schedule_jobs).twice.and_call_original
Gitlab::Geo::ShardHealthCache.update([shard_name, 'shard2', 'shard3', 'shard4', 'shard5']) Gitlab::ShardHealthCache.update([shard_name, 'shard2', 'shard3', 'shard4', 'shard5'])
secondary.update!(repos_max_capacity: 5) secondary.update!(repos_max_capacity: 5)
subject.perform(shard_name) subject.perform(shard_name)
......
...@@ -16,7 +16,7 @@ describe Geo::RepositoryVerification::Primary::ShardWorker, :postgresql, :clean_ ...@@ -16,7 +16,7 @@ describe Geo::RepositoryVerification::Primary::ShardWorker, :postgresql, :clean_
allow_any_instance_of(Gitlab::ExclusiveLease).to receive(:try_obtain) { true } allow_any_instance_of(Gitlab::ExclusiveLease).to receive(:try_obtain) { true }
allow_any_instance_of(Gitlab::ExclusiveLease).to receive(:renew) { true } allow_any_instance_of(Gitlab::ExclusiveLease).to receive(:renew) { true }
Gitlab::Geo::ShardHealthCache.update([shard_name]) Gitlab::ShardHealthCache.update([shard_name])
end end
it 'performs Geo::RepositoryVerification::Primary::SingleWorker for each project' do it 'performs Geo::RepositoryVerification::Primary::SingleWorker for each project' do
...@@ -66,7 +66,7 @@ describe Geo::RepositoryVerification::Primary::ShardWorker, :postgresql, :clean_ ...@@ -66,7 +66,7 @@ describe Geo::RepositoryVerification::Primary::ShardWorker, :postgresql, :clean_
it 'does not perform Geo::RepositoryVerification::Primary::SingleWorker when shard becomes unhealthy' do it 'does not perform Geo::RepositoryVerification::Primary::SingleWorker when shard becomes unhealthy' do
create(:project) create(:project)
Gitlab::Geo::ShardHealthCache.update([]) Gitlab::ShardHealthCache.update([])
expect(primary_singleworker).not_to receive(:perform_async) expect(primary_singleworker).not_to receive(:perform_async)
......
...@@ -19,7 +19,7 @@ describe Geo::RepositoryVerification::Secondary::ShardWorker, :postgresql, :clea ...@@ -19,7 +19,7 @@ describe Geo::RepositoryVerification::Secondary::ShardWorker, :postgresql, :clea
allow_any_instance_of(Gitlab::ExclusiveLease).to receive(:try_obtain) { true } allow_any_instance_of(Gitlab::ExclusiveLease).to receive(:try_obtain) { true }
allow_any_instance_of(Gitlab::ExclusiveLease).to receive(:renew) { true } allow_any_instance_of(Gitlab::ExclusiveLease).to receive(:renew) { true }
Gitlab::Geo::ShardHealthCache.update([shard_name]) Gitlab::ShardHealthCache.update([shard_name])
end end
it 'schedules job for each project' do it 'schedules job for each project' do
...@@ -116,7 +116,7 @@ describe Geo::RepositoryVerification::Secondary::ShardWorker, :postgresql, :clea ...@@ -116,7 +116,7 @@ describe Geo::RepositoryVerification::Secondary::ShardWorker, :postgresql, :clea
it 'does not schedule jobs when shard becomes unhealthy' do it 'does not schedule jobs when shard becomes unhealthy' do
create(:repository_state, project: project) create(:repository_state, project: project)
Gitlab::Geo::ShardHealthCache.update([]) Gitlab::ShardHealthCache.update([])
expect(secondary_singleworker).not_to receive(:perform_async) expect(secondary_singleworker).not_to receive(:perform_async)
......
...@@ -17,12 +17,6 @@ describe Geo::Scheduler::PerShardSchedulerWorker do ...@@ -17,12 +17,6 @@ describe Geo::Scheduler::PerShardSchedulerWorker do
expect(described_class).to include_module(::Gitlab::Geo::LogHelpers) expect(described_class).to include_module(::Gitlab::Geo::LogHelpers)
end end
it 'includes Gitaly health checks' do
expect(described_class::HEALTHY_SHARD_CHECKS).to include(
Gitlab::HealthChecks::GitalyCheck
)
end
describe 'instance methods' do describe 'instance methods' do
subject(:per_shard_scheduler_worker) { described_class.new } subject(:per_shard_scheduler_worker) { described_class.new }
...@@ -47,7 +41,7 @@ describe Geo::Scheduler::PerShardSchedulerWorker do ...@@ -47,7 +41,7 @@ describe Geo::Scheduler::PerShardSchedulerWorker do
end end
describe '#ready_shards' do describe '#ready_shards' do
let(:ready_shards) { [[default_shard, other_shard, unhealthy_shard]] } let(:ready_shards) { [default_shard, other_shard, unhealthy_shard] }
it "returns an array of ready shards" do it "returns an array of ready shards" do
expect(per_shard_scheduler_worker.ready_shards).to eq(ready_shards) expect(per_shard_scheduler_worker.ready_shards).to eq(ready_shards)
......
module Gitlab
class ShardHealthCache
HEALTHY_SHARDS_KEY = 'gitlab-healthy-shards'.freeze
HEALTHY_SHARDS_TIMEOUT = 300
# Clears the Redis set storing the list of healthy shards
def self.clear
Gitlab::Redis::Cache.with { |redis| redis.del(HEALTHY_SHARDS_KEY) }
end
# Updates the list of healthy shards using a Redis set
#
# shards - An array of shard names to store
def self.update(shards)
Gitlab::Redis::Cache.with do |redis|
redis.multi do |m|
m.del(HEALTHY_SHARDS_KEY)
shards.each { |shard_name| m.sadd(HEALTHY_SHARDS_KEY, shard_name) }
m.expire(HEALTHY_SHARDS_KEY, HEALTHY_SHARDS_TIMEOUT)
end
end
end
# Returns an array of strings of healthy shards
def self.cached_healthy_shards
Gitlab::Redis::Cache.with { |redis| redis.smembers(HEALTHY_SHARDS_KEY) }
end
# Checks whether the given shard name is in the list of healthy shards.
#
# shard_name - The string to check
def self.healthy_shard?(shard_name)
Gitlab::Redis::Cache.with { |redis| redis.sismember(HEALTHY_SHARDS_KEY, shard_name) }
end
# Returns the number of healthy shards in the Redis set
def self.healthy_shard_count
Gitlab::Redis::Cache.with { |redis| redis.scard(HEALTHY_SHARDS_KEY) }
end
end
end
require 'spec_helper' require 'spec_helper'
describe Gitlab::Geo::ShardHealthCache, :clean_gitlab_redis_cache do describe Gitlab::ShardHealthCache, :clean_gitlab_redis_cache do
let(:shards) { %w(foo bar) } let(:shards) { %w(foo bar) }
before do before do
......
require 'spec_helper' require 'spec_helper'
describe RepositoryCheck::BatchWorker do describe RepositoryCheck::BatchWorker do
let(:shard_name) { 'default' }
subject { described_class.new } subject { described_class.new }
before do
Gitlab::ShardHealthCache.update([shard_name])
end
it 'prefers projects that have never been checked' do it 'prefers projects that have never been checked' do
projects = create_list(:project, 3, created_at: 1.week.ago) projects = create_list(:project, 3, created_at: 1.week.ago)
projects[0].update_column(:last_repository_check_at, 4.months.ago) projects[0].update_column(:last_repository_check_at, 4.months.ago)
projects[2].update_column(:last_repository_check_at, 3.months.ago) projects[2].update_column(:last_repository_check_at, 3.months.ago)
expect(subject.perform).to eq(projects.values_at(1, 0, 2).map(&:id)) expect(subject.perform(shard_name)).to eq(projects.values_at(1, 0, 2).map(&:id))
end end
it 'sorts projects by last_repository_check_at' do it 'sorts projects by last_repository_check_at' do
...@@ -17,7 +22,7 @@ describe RepositoryCheck::BatchWorker do ...@@ -17,7 +22,7 @@ describe RepositoryCheck::BatchWorker do
projects[1].update_column(:last_repository_check_at, 4.months.ago) projects[1].update_column(:last_repository_check_at, 4.months.ago)
projects[2].update_column(:last_repository_check_at, 3.months.ago) projects[2].update_column(:last_repository_check_at, 3.months.ago)
expect(subject.perform).to eq(projects.values_at(1, 2, 0).map(&:id)) expect(subject.perform(shard_name)).to eq(projects.values_at(1, 2, 0).map(&:id))
end end
it 'excludes projects that were checked recently' do it 'excludes projects that were checked recently' do
...@@ -26,7 +31,14 @@ describe RepositoryCheck::BatchWorker do ...@@ -26,7 +31,14 @@ describe RepositoryCheck::BatchWorker do
projects[1].update_column(:last_repository_check_at, 2.months.ago) projects[1].update_column(:last_repository_check_at, 2.months.ago)
projects[2].update_column(:last_repository_check_at, 3.days.ago) projects[2].update_column(:last_repository_check_at, 3.days.ago)
expect(subject.perform).to eq([projects[1].id]) expect(subject.perform(shard_name)).to eq([projects[1].id])
end
it 'excludes projects on another shard' do
projects = create_list(:project, 2, created_at: 1.week.ago)
projects[0].update_column(:repository_storage, 'other')
expect(subject.perform(shard_name)).to eq([projects[1].id])
end end
it 'does nothing when repository checks are disabled' do it 'does nothing when repository checks are disabled' do
...@@ -34,13 +46,20 @@ describe RepositoryCheck::BatchWorker do ...@@ -34,13 +46,20 @@ describe RepositoryCheck::BatchWorker do
stub_application_setting(repository_checks_enabled: false) stub_application_setting(repository_checks_enabled: false)
expect(subject.perform).to eq(nil) expect(subject.perform(shard_name)).to eq(nil)
end
it 'does nothing when shard is unhealthy' do
shard_name = 'broken'
create(:project, created_at: 1.week.ago, repository_storage: shard_name)
expect(subject.perform(shard_name)).to eq(nil)
end end
it 'skips projects created less than 24 hours ago' do it 'skips projects created less than 24 hours ago' do
project = create(:project) project = create(:project)
project.update_column(:created_at, 23.hours.ago) project.update_column(:created_at, 23.hours.ago)
expect(subject.perform).to eq([]) expect(subject.perform(shard_name)).to eq([])
end end
end end
require 'spec_helper'
describe RepositoryCheck::DispatchWorker do
subject { described_class.new }
it 'does nothing when repository checks are disabled' do
stub_application_setting(repository_checks_enabled: false)
expect(RepositoryCheck::BatchWorker).not_to receive(:perform_async)
subject.perform
end
it 'dispatches work to RepositoryCheck::BatchWorker' do
expect(RepositoryCheck::BatchWorker).to receive(:perform_async).at_least(:once)
subject.perform
end
context 'with unhealthy shard' do
let(:default_shard_name) { 'default' }
let(:unhealthy_shard_name) { 'unhealthy' }
let(:default_shard) { Gitlab::HealthChecks::Result.new(true, nil, shard: default_shard_name) }
let(:unhealthy_shard) { Gitlab::HealthChecks::Result.new(false, '14:Connect Failed', shard: unhealthy_shard_name) }
before do
allow(Gitlab::HealthChecks::GitalyCheck).to receive(:readiness).and_return([default_shard, unhealthy_shard])
end
it 'only triggers RepositoryCheck::BatchWorker for healthy shards' do
expect(RepositoryCheck::BatchWorker).to receive(:perform_async).with('default')
subject.perform
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment