Extract Geo::Secondary::BackfillWorker concern

parent 018a2d2c
# frozen_string_literal: true
module Geo
module Secondary
module BackfillWorker
extend ActiveSupport::Concern
LEASE_TIMEOUT = 60.minutes
RUN_TIME = 60.minutes.to_i
included do
prepend ShadowMethods
include ApplicationWorker
include ExclusiveLeaseGuard
include GeoQueue
include ::Gitlab::Geo::LogHelpers
include ::Gitlab::Utils::StrongMemoize
attr_reader :shard_name, :start_time, :loops
end
module ShadowMethods
def lease_key
@lease_key ||= "#{self.class.name.underscore}:shard:#{shard_name}"
end
def lease_timeout
LEASE_TIMEOUT
end
end
private
def base_log_data(message)
super(message).merge(worker_metadata)
end
def healthy_node?
unless Gitlab::Geo.geo_database_configured?
log_info('Geo database not configured')
return false
end
unless Gitlab::Geo.secondary?
log_info('Current node not a secondary')
return false
end
unless Gitlab::ShardHealthCache.healthy_shard?(shard_name)
log_info("Shard (#{shard_name}) is not healthy")
return false
end
true
end
# rubocop:disable Gitlab/ModuleWithInstanceVariables
def node_enabled?
# Only check every minute to avoid polling the DB excessively
unless @last_enabled_check.present? && @last_enabled_check > 1.minute.ago
@last_enabled_check = Time.now.utc
clear_memoization(:current_node_enabled)
end
strong_memoize(:current_node_enabled) do
Gitlab::Geo.current_node_enabled?
end
end
# rubocop:enable Gitlab/ModuleWithInstanceVariables
def run_time
RUN_TIME
end
def over_capacity?
false
end
def over_time?
(Time.now.utc - start_time) >= run_time
end
def worker_metadata
{ shard: shard_name }
end
end
end
end
...@@ -3,14 +3,7 @@ ...@@ -3,14 +3,7 @@
module Geo module Geo
module Secondary module Secondary
class RepositoryBackfillWorker class RepositoryBackfillWorker
include ApplicationWorker include Geo::Secondary::BackfillWorker
include ExclusiveLeaseGuard
include GeoQueue
include ::Gitlab::Geo::LogHelpers
include ::Gitlab::Utils::StrongMemoize
LEASE_TIMEOUT = 60.minutes
RUN_TIME = 60.minutes.to_i
def initialize def initialize
@scheduled_jobs = [] @scheduled_jobs = []
...@@ -21,20 +14,7 @@ module Geo ...@@ -21,20 +14,7 @@ module Geo
@start_time = Time.now.utc @start_time = Time.now.utc
@loops = 0 @loops = 0
unless Gitlab::Geo.geo_database_configured? return unless healthy_node?
log_info('Geo database not configured')
return
end
unless Gitlab::Geo.secondary?
log_info('Current node not a secondary')
return
end
unless Gitlab::ShardHealthCache.healthy_shard?(shard_name)
log_info("Shard (#{shard_name}) is not healthy")
return
end
try_obtain_lease do try_obtain_lease do
log_info('Repository backfilling started') log_info('Repository backfilling started')
...@@ -53,15 +33,15 @@ module Geo ...@@ -53,15 +33,15 @@ module Geo
# This will stream the results one by one # This will stream the results one by one
# until there are no more results to fetch. # until there are no more results to fetch.
result = connection.get_result or break result = connection.get_result
break :complete if result.nil?
result.check result.check
result.each do |row| result.each do |row|
schedule_job(row['id']) schedule_job(row['id'])
end end
if over_capacity? sleep(1) if over_capacity?
sleep(1)
end
end end
rescue => error rescue => error
reason = :error reason = :error
...@@ -75,31 +55,32 @@ module Geo ...@@ -75,31 +55,32 @@ module Geo
private private
attr_reader :shard_name, :start_time, :loops, :scheduled_jobs attr_reader :scheduled_jobs
def base_log_data(message) def connection
super(message).merge(worker_metadata) strong_memoize(:connection) { Geo::TrackingBase.connection.raw_connection }
end end
def lease_key def max_capacity
@lease_key ||= "#{self.class.name.underscore}:shard:#{shard_name}" # If we don't have a count, that means that for some reason
end # RepositorySyncWorker stopped running/updating the cache. We might
# be trying to shut down Geo while this job may still be running.
healthy_count = healthy_shard_count
return 0 unless healthy_count > 0
def lease_timeout capacity_per_shard = Gitlab::Geo.current_node.repos_max_capacity / healthy_count
LEASE_TIMEOUT
end
def worker_metadata [1, capacity_per_shard.to_i].max
{ shard: shard_name }
end end
def connection def healthy_shard_count
strong_memoize(:connection) do Gitlab::ShardHealthCache.healthy_shard_count.to_i
Geo::TrackingBase.connection.raw_connection
end end
def over_capacity?
scheduled_jobs.size >= max_capacity
end end
# rubocop: disable CodeReuse/ActiveRecord
def schedule_job(project_id) def schedule_job(project_id)
job_id = Geo::ProjectSyncWorker.perform_async(project_id, sync_repository: true, sync_wiki: true) job_id = Geo::ProjectSyncWorker.perform_async(project_id, sync_repository: true, sync_wiki: true)
...@@ -110,20 +91,8 @@ module Geo ...@@ -110,20 +91,8 @@ module Geo
log_info("Repository could not be scheduled for backfilling", project_id: project_id) log_info("Repository could not be scheduled for backfilling", project_id: project_id)
end end
end end
# rubocop: enable CodeReuse/ActiveRecord
# rubocop: disable CodeReuse/ActiveRecord
def unsynced_projects_ids
Geo::ProjectUnsyncedFinder
.new(current_node: Gitlab::Geo.current_node, shard_name: shard_name)
.execute
.reorder(last_repository_updated_at: :desc)
.select(:id)
end
# rubocop: enable CodeReuse/ActiveRecord
def update_jobs_in_progress def update_jobs_in_progress
scheduled_job_ids = scheduled_jobs.map { |data| data[:job_id] }
status = Gitlab::SidekiqStatus.job_status(scheduled_job_ids) status = Gitlab::SidekiqStatus.job_status(scheduled_job_ids)
# SidekiqStatus returns an array of booleans: true if the job is still running, false otherwise. # SidekiqStatus returns an array of booleans: true if the job is still running, false otherwise.
...@@ -132,42 +101,19 @@ module Geo ...@@ -132,42 +101,19 @@ module Geo
@scheduled_jobs = @scheduled_jobs.zip(status).map { |(job, running)| job if running }.compact @scheduled_jobs = @scheduled_jobs.zip(status).map { |(job, running)| job if running }.compact
end end
def max_capacity def scheduled_job_ids
healthy_count = Gitlab::ShardHealthCache.healthy_shard_count scheduled_jobs.map { |data| data[:job_id] }
# If we don't have a count, that means that for some reason
# RepositorySyncWorker stopped running/updating the cache. We might
# be trying to shut down Geo while this job may still be running.
return 0 unless healthy_count.to_i > 0
capacity_per_shard = Gitlab::Geo.current_node.repos_max_capacity / healthy_count
[1, capacity_per_shard.to_i].max
end
def node_enabled?
# Only check every minute to avoid polling the DB excessively
unless @last_enabled_check.present? && @last_enabled_check > 1.minute.ago
@last_enabled_check = Time.now
clear_memoization(:current_node_enabled)
end
strong_memoize(:current_node_enabled) do
Gitlab::Geo.current_node_enabled?
end
end
def run_time
RUN_TIME
end
def over_capacity?
scheduled_jobs.size >= max_capacity
end end
def over_time? # rubocop: disable CodeReuse/ActiveRecord
(Time.now.utc - start_time) >= run_time def unsynced_projects_ids
Geo::ProjectUnsyncedFinder
.new(current_node: Gitlab::Geo.current_node, shard_name: shard_name)
.execute
.reorder(last_repository_updated_at: :desc)
.select(:id)
end end
# rubocop: enable CodeReuse/ActiveRecord
end end
end end
end end
...@@ -6,12 +6,12 @@ describe Geo::Secondary::RepositoryBackfillWorker, :geo, :geo_fdw, :clean_gitlab ...@@ -6,12 +6,12 @@ describe Geo::Secondary::RepositoryBackfillWorker, :geo, :geo_fdw, :clean_gitlab
include EE::GeoHelpers include EE::GeoHelpers
let(:primary) { create(:geo_node, :primary) } let(:primary) { create(:geo_node, :primary) }
let(:secondary) { create(:geo_node) } let(:secondary) { create(:geo_node, repos_max_capacity: 5) }
let(:shard_name) { Gitlab.config.repositories.storages.keys.first } let(:shard_name) { Gitlab.config.repositories.storages.keys.first }
before do before do
stub_current_geo_node(secondary) stub_current_geo_node(secondary)
stub_shard_healthy(shard_name, true) stub_healthy_shards(shard_name)
end end
describe '#perform' do describe '#perform' do
...@@ -35,7 +35,7 @@ describe Geo::Secondary::RepositoryBackfillWorker, :geo, :geo_fdw, :clean_gitlab ...@@ -35,7 +35,7 @@ describe Geo::Secondary::RepositoryBackfillWorker, :geo, :geo_fdw, :clean_gitlab
end end
it 'does not schedule jobs when shard is not healthy' do it 'does not schedule jobs when shard is not healthy' do
stub_shard_healthy(shard_name, false) stub_healthy_shards([])
create(:project) create(:project)
expect(Geo::ProjectSyncWorker).not_to receive(:perform_async) expect(Geo::ProjectSyncWorker).not_to receive(:perform_async)
...@@ -68,5 +68,15 @@ describe Geo::Secondary::RepositoryBackfillWorker, :geo, :geo_fdw, :clean_gitlab ...@@ -68,5 +68,15 @@ describe Geo::Secondary::RepositoryBackfillWorker, :geo, :geo_fdw, :clean_gitlab
subject.perform(shard_name) subject.perform(shard_name)
end end
it 'respects Geo secondary node max capacity per shard' do
stub_healthy_shards([shard_name, 'shard2', 'shard3', 'shard4', 'shard5'])
allow(Geo::ProjectSyncWorker).to receive(:perform_async).twice.and_return('jid-123')
create_list(:project, 2)
expect(subject).to receive(:sleep).twice.and_call_original
subject.perform(shard_name)
end
end end
end end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment