Commit 99b64cce authored by Ash McKenzie's avatar Ash McKenzie

Merge branch 'da-poc-streaming-results' into 'master'

Use streaming results instead of OFFSET/LIMIT on Geo backfilling workers

See merge request gitlab-org/gitlab-ee!15457
parents facdbd2b 7a574e5a
...@@ -10,7 +10,7 @@ ...@@ -10,7 +10,7 @@
# .execute. # .execute.
module Geo module Geo
class ProjectUnsyncedFinder class ProjectUnsyncedFinder
def initialize(current_node:, shard_name:, batch_size:) def initialize(current_node:, shard_name:, batch_size: nil)
@current_node = Geo::Fdw::GeoNode.find(current_node.id) @current_node = Geo::Fdw::GeoNode.find(current_node.id)
@shard_name = shard_name @shard_name = shard_name
@batch_size = batch_size @batch_size = batch_size
...@@ -20,10 +20,9 @@ module Geo ...@@ -20,10 +20,9 @@ module Geo
def execute def execute
return Geo::Fdw::Project.none unless valid_shard? return Geo::Fdw::Project.none unless valid_shard?
projects relation = projects.missing_project_registry.within_shards(shard_name)
.missing_project_registry relation = relation.limit(batch_size) unless batch_size.nil?
.within_shards(shard_name) relation
.limit(batch_size)
end end
# rubocop:enable CodeReuse/ActiveRecord # rubocop:enable CodeReuse/ActiveRecord
......
...@@ -10,7 +10,7 @@ ...@@ -10,7 +10,7 @@
# .execute. # .execute.
module Geo module Geo
class ProjectUpdatedRecentlyFinder class ProjectUpdatedRecentlyFinder
def initialize(current_node:, shard_name:, batch_size:) def initialize(current_node:, shard_name:, batch_size: nil)
@current_node = Geo::Fdw::GeoNode.find(current_node.id) @current_node = Geo::Fdw::GeoNode.find(current_node.id)
@shard_name = shard_name @shard_name = shard_name
@batch_size = batch_size @batch_size = batch_size
...@@ -20,10 +20,9 @@ module Geo ...@@ -20,10 +20,9 @@ module Geo
def execute def execute
return Geo::Fdw::Project.none unless valid_shard? return Geo::Fdw::Project.none unless valid_shard?
projects relation = projects.recently_updated.within_shards(shard_name)
.recently_updated relation = relation.limit(batch_size) unless batch_size.nil?
.within_shards(shard_name) relation
.limit(batch_size)
end end
# rubocop:enable CodeReuse/ActiveRecord # rubocop:enable CodeReuse/ActiveRecord
......
...@@ -33,6 +33,7 @@ ...@@ -33,6 +33,7 @@
- geo:geo_project_sync - geo:geo_project_sync
- geo:geo_container_repository_sync - geo:geo_container_repository_sync
- geo:geo_rename_repository - geo:geo_rename_repository
- geo:geo_secondary_repository_backfill
- geo:geo_repositories_clean_up - geo:geo_repositories_clean_up
- geo:geo_repository_cleanup - geo:geo_repository_cleanup
- geo:geo_repository_destroy - geo:geo_repository_destroy
......
# frozen_string_literal: true
module Geo
module Secondary
module BackfillWorker
extend ActiveSupport::Concern
LEASE_TIMEOUT = 60.minutes
RUN_TIME = 60.minutes.to_i
included do
prepend ShadowMethods
include ApplicationWorker
include ExclusiveLeaseGuard
include GeoQueue
include ::Gitlab::Geo::LogHelpers
include ::Gitlab::Utils::StrongMemoize
sidekiq_options retry: false
attr_reader :shard_name, :start_time, :loops
end
module ShadowMethods
def lease_key
@lease_key ||= "#{self.class.name.underscore}:shard:#{shard_name}"
end
def lease_timeout
LEASE_TIMEOUT
end
end
def initialize
@scheduled_jobs = []
@loops = 0
end
# rubocop:disable Gitlab/ModuleWithInstanceVariables
def perform(shard_name)
@shard_name = shard_name
@start_time = Time.now.utc
return unless healthy_node?
try_obtain_lease do
schedule_jobs
end
end
# rubocop:enable Gitlab/ModuleWithInstanceVariables
private
def base_log_data(message)
super(message).merge(worker_metadata)
end
def healthy_node?
unless Gitlab::Geo.geo_database_configured?
log_info('Geo database not configured')
return false
end
unless Gitlab::Geo.secondary?
log_info('Current node not a secondary')
return false
end
unless Gitlab::ShardHealthCache.healthy_shard?(shard_name)
log_info("Shard (#{shard_name}) is not healthy")
return false
end
true
end
# rubocop:disable Gitlab/ModuleWithInstanceVariables
def node_enabled?
# Only check every minute to avoid polling the DB excessively
unless @last_enabled_check.present? && @last_enabled_check > 1.minute.ago
@last_enabled_check = Time.now.utc
clear_memoization(:current_node_enabled)
end
strong_memoize(:current_node_enabled) do
Gitlab::Geo.current_node_enabled?
end
end
# rubocop:enable Gitlab/ModuleWithInstanceVariables
def run_time
RUN_TIME
end
def over_capacity?
false
end
def over_time?
(Time.now.utc - start_time) >= run_time
end
def worker_metadata
{ shard: shard_name }
end
end
end
end
...@@ -3,7 +3,11 @@ ...@@ -3,7 +3,11 @@
module Geo module Geo
class RepositorySyncWorker < Geo::Scheduler::Secondary::PerShardSchedulerWorker class RepositorySyncWorker < Geo::Scheduler::Secondary::PerShardSchedulerWorker
def schedule_job(shard_name) def schedule_job(shard_name)
RepositoryShardSyncWorker.perform_async(shard_name) if ::Feature.enabled?(:geo_streaming_results_repository_sync)
Geo::Secondary::RepositoryBackfillWorker.perform_async(shard_name)
else
Geo::RepositoryShardSyncWorker.perform_async(shard_name)
end
end end
end end
end end
# frozen_string_literal: true
module Geo
module Secondary
class RepositoryBackfillWorker
include Geo::Secondary::BackfillWorker
private
attr_reader :scheduled_jobs
def connection
strong_memoize(:connection) { Geo::TrackingBase.connection.raw_connection }
end
def max_capacity
# If we don't have a count, that means that for some reason
# RepositorySyncWorker stopped running/updating the cache. We might
# be trying to shut down Geo while this job may still be running.
healthy_count = healthy_shard_count
return 0 unless healthy_count > 0
capacity_per_shard = Gitlab::Geo.current_node.repos_max_capacity / healthy_count
[1, capacity_per_shard.to_i].max
end
def healthy_shard_count
Gitlab::ShardHealthCache.healthy_shard_count.to_i
end
def over_capacity?
scheduled_jobs.size >= max_capacity
end
def schedule_jobs
log_info('Repository backfilling started')
reason = :unknown
begin
connection.send_query("#{projects_ids_unsynced.to_sql};#{project_ids_updated_recently.to_sql}")
connection.set_single_row_mode
reason = loop do
break :node_disabled unless node_enabled?
break :over_time if over_time?
break :lease_lost unless renew_lease!
update_jobs_in_progress
if over_capacity?
sleep(1)
else
# This will stream the results one by one
# until there are no more results to fetch.
result = connection.get_result
break :complete if result.nil?
result.check
result.each do |row|
schedule_job(row['id'])
end
end
end
rescue => error
reason = :error
log_error('Repository backfilling error', error)
raise error
ensure
log_info('Repository backfilling finished', total_loops: loops, duration: Time.now.utc - start_time, reason: reason)
end
end
def schedule_job(project_id)
job_id = Geo::ProjectSyncWorker.perform_async(project_id, sync_repository: true, sync_wiki: true)
if job_id
@scheduled_jobs << { job_id: job_id }
log_info("Repository scheduled for backfilling", project_id: project_id, job_id: job_id)
else
log_info("Repository could not be scheduled for backfilling", project_id: project_id)
end
end
def scheduled_job_ids
scheduled_jobs.map { |data| data[:job_id] }
end
def update_jobs_in_progress
job_ids = scheduled_job_ids
return if job_ids.empty?
# SidekiqStatus returns an array of booleans: true if the job is still running, false otherwise.
# For each entry, first use `zip` to make { job_id: 123 } -> [ { job_id: 123 }, bool ]
# Next, filter out the jobs that have completed.
@scheduled_jobs = Gitlab::SidekiqStatus.job_status(scheduled_job_ids).then do |status|
@scheduled_jobs.zip(status).map { |(job, running)| job if running }.compact
end
end
# rubocop: disable CodeReuse/ActiveRecord
def projects_ids_unsynced
Geo::ProjectUnsyncedFinder
.new(current_node: Gitlab::Geo.current_node, shard_name: shard_name)
.execute
.reorder(last_repository_updated_at: :desc)
.select(:id)
end
# rubocop: enable CodeReuse/ActiveRecord
# rubocop: disable CodeReuse/ActiveRecord
def project_ids_updated_recently
Geo::ProjectUpdatedRecentlyFinder
.new(current_node: Gitlab::Geo.current_node, shard_name: shard_name)
.execute
.order('project_registry.last_repository_synced_at ASC NULLS FIRST, projects.last_repository_updated_at ASC')
.select(:id)
end
# rubocop: enable CodeReuse/ActiveRecord
end
end
end
...@@ -21,8 +21,26 @@ module EE ...@@ -21,8 +21,26 @@ module EE
allow(::Gitlab::Geo).to receive(:secondary?).and_return(true) allow(::Gitlab::Geo).to receive(:secondary?).and_return(true)
end end
def stub_node_disabled(node)
allow(node).to receive(:enabled?).and_return(false)
end
def stub_selective_sync(node, value) def stub_selective_sync(node, value)
allow(node).to receive(:selective_sync?).and_return(value) allow(node).to receive(:selective_sync?).and_return(value)
end end
def stub_healthy_shards(shards)
::Gitlab::ShardHealthCache.update(Array(shards))
end
def with_no_geo_database_configured(&block)
allow(::Gitlab::Geo).to receive(:geo_database_configured?).and_return(false)
yield
# We need to unstub here or the DatabaseCleaner will have issues since it
# will appear as though the tracking DB were not available
allow(::Gitlab::Geo).to receive(:geo_database_configured?).and_call_original
end
end end
end end
...@@ -8,11 +8,8 @@ describe Geo::RepositorySyncWorker, :geo, :clean_gitlab_redis_cache do ...@@ -8,11 +8,8 @@ describe Geo::RepositorySyncWorker, :geo, :clean_gitlab_redis_cache do
let!(:synced_group) { create(:group) } let!(:synced_group) { create(:group) }
let!(:project_in_synced_group) { create(:project, group: synced_group) } let!(:project_in_synced_group) { create(:project, group: synced_group) }
let!(:unsynced_project) { create(:project) } let!(:unsynced_project) { create(:project) }
let(:healthy_shard_name) { project_in_synced_group.repository.storage } let(:healthy_shard_name) { project_in_synced_group.repository.storage }
subject { described_class.new }
before do before do
stub_current_geo_node(secondary) stub_current_geo_node(secondary)
end end
...@@ -21,7 +18,7 @@ describe Geo::RepositorySyncWorker, :geo, :clean_gitlab_redis_cache do ...@@ -21,7 +18,7 @@ describe Geo::RepositorySyncWorker, :geo, :clean_gitlab_redis_cache do
Sidekiq::Testing.inline! { example.run } Sidekiq::Testing.inline! { example.run }
end end
describe '#perform' do shared_examples '#perform' do |worker|
context 'additional shards' do context 'additional shards' do
it 'skips backfill for repositories on other shards' do it 'skips backfill for repositories on other shards' do
create(:project, :broken_storage, group: synced_group) create(:project, :broken_storage, group: synced_group)
...@@ -32,7 +29,7 @@ describe Geo::RepositorySyncWorker, :geo, :clean_gitlab_redis_cache do ...@@ -32,7 +29,7 @@ describe Geo::RepositorySyncWorker, :geo, :clean_gitlab_redis_cache do
raise GRPC::Unavailable.new('No Gitaly available') raise GRPC::Unavailable.new('No Gitaly available')
end end
expect(Geo::RepositoryShardSyncWorker).not_to receive(:perform_async).with('broken') expect(worker).not_to receive(:perform_async).with('broken')
subject.perform subject.perform
end end
...@@ -44,8 +41,8 @@ describe Geo::RepositorySyncWorker, :geo, :clean_gitlab_redis_cache do ...@@ -44,8 +41,8 @@ describe Geo::RepositorySyncWorker, :geo, :clean_gitlab_redis_cache do
expect(Gitlab::HealthChecks::GitalyCheck).to receive(:readiness) expect(Gitlab::HealthChecks::GitalyCheck).to receive(:readiness)
.and_return([result(true, healthy_shard_name), result(true, 'broken')]) .and_return([result(true, healthy_shard_name), result(true, 'broken')])
expect(Geo::RepositoryShardSyncWorker).to receive(:perform_async).with('default') expect(worker).to receive(:perform_async).with('default')
expect(Geo::RepositoryShardSyncWorker).not_to receive(:perform_async).with('broken') expect(worker).not_to receive(:perform_async).with('broken')
subject.perform subject.perform
end end
...@@ -61,8 +58,8 @@ describe Geo::RepositorySyncWorker, :geo, :clean_gitlab_redis_cache do ...@@ -61,8 +58,8 @@ describe Geo::RepositorySyncWorker, :geo, :clean_gitlab_redis_cache do
# hide the 'broken' storage for this spec # hide the 'broken' storage for this spec
stub_storage_settings({}) stub_storage_settings({})
expect(Geo::RepositoryShardSyncWorker).to receive(:perform_async).with(project_in_synced_group.repository.storage) expect(worker).to receive(:perform_async).with(project_in_synced_group.repository.storage)
expect(Geo::RepositoryShardSyncWorker).not_to receive(:perform_async).with('unknown') expect(worker).not_to receive(:perform_async).with('unknown')
subject.perform subject.perform
end end
...@@ -77,14 +74,30 @@ describe Geo::RepositorySyncWorker, :geo, :clean_gitlab_redis_cache do ...@@ -77,14 +74,30 @@ describe Geo::RepositorySyncWorker, :geo, :clean_gitlab_redis_cache do
expect(Gitlab::HealthChecks::GitalyCheck).to receive(:readiness) expect(Gitlab::HealthChecks::GitalyCheck).to receive(:readiness)
.and_return([result(true, healthy_shard_name), result(false, 'broken')]) .and_return([result(true, healthy_shard_name), result(false, 'broken')])
expect(Geo::RepositoryShardSyncWorker).to receive(:perform_async).with(healthy_shard_name) expect(worker).to receive(:perform_async).with(healthy_shard_name)
expect(Geo::RepositoryShardSyncWorker).not_to receive(:perform_async).with('broken') expect(worker).not_to receive(:perform_async).with('broken')
subject.perform subject.perform
end end
end end
end end
context 'when geo_streaming_results_repository_sync flag is enabled', :geo_fdw do
before do
stub_feature_flags(geo_streaming_results_repository_sync: true)
end
include_examples '#perform', Geo::Secondary::RepositoryBackfillWorker
end
context 'when geo_streaming_results_repository_sync flag is disabled' do
before do
stub_feature_flags(geo_streaming_results_repository_sync: false)
end
include_examples '#perform', Geo::RepositoryShardSyncWorker
end
def result(success, shard) def result(success, shard)
Gitlab::HealthChecks::Result.new(success, nil, { shard: shard }) Gitlab::HealthChecks::Result.new(success, nil, { shard: shard })
end end
......
...@@ -167,7 +167,7 @@ describe Geo::RepositoryVerification::Secondary::ShardWorker, :geo, :geo_fdw, :r ...@@ -167,7 +167,7 @@ describe Geo::RepositoryVerification::Secondary::ShardWorker, :geo, :geo_fdw, :r
end end
it 'does not schedule jobs when not running on a secondary' do it 'does not schedule jobs when not running on a secondary' do
allow(Gitlab::Geo).to receive(:primary?) { false } allow(Gitlab::Geo).to receive(:secondary?) { false }
expect(secondary_single_worker).not_to receive(:perform_async) expect(secondary_single_worker).not_to receive(:perform_async)
......
# frozen_string_literal: true
require 'spec_helper'
describe Geo::Secondary::RepositoryBackfillWorker, :geo, :geo_fdw, :clean_gitlab_redis_cache do
include EE::GeoHelpers
let(:primary) { create(:geo_node, :primary) }
let(:secondary) { create(:geo_node, repos_max_capacity: 5) }
let(:shard_name) { Gitlab.config.repositories.storages.keys.first }
before do
stub_current_geo_node(secondary)
stub_healthy_shards(shard_name)
end
it 'disables Sidekiq retries' do
expect(subject.sidekiq_options_hash).to eq(
'retry' => false,
'queue' => 'geo:geo_secondary_repository_backfill',
'queue_namespace' => :geo
)
end
describe '#perform' do
it 'does not schedule jobs when Geo database is not configured' do
create(:project)
expect(Geo::ProjectSyncWorker).not_to receive(:perform_async)
with_no_geo_database_configured do
subject.perform(shard_name)
end
end
it 'does not schedule jobs when not running on a Geo secondary node' do
stub_current_geo_node(primary)
create(:project)
expect(Geo::ProjectSyncWorker).not_to receive(:perform_async)
subject.perform(shard_name)
end
it 'does not schedule jobs when shard is not healthy' do
stub_healthy_shards([])
create(:project)
expect(Geo::ProjectSyncWorker).not_to receive(:perform_async)
subject.perform(shard_name)
end
it 'does not schedule jobs when the Geo secondary node is disabled' do
stub_node_disabled(secondary)
create(:project)
expect(Geo::ProjectSyncWorker).not_to receive(:perform_async)
subject.perform(shard_name)
end
it 'does not schedule jobs for projects on other shards' do
project = create(:project)
project.update_column(:repository_storage, 'other')
expect(Geo::ProjectSyncWorker).not_to receive(:perform_async)
subject.perform(shard_name)
end
it 'schedules a job for each unsynced project' do
create_list(:project, 2)
expect(Geo::ProjectSyncWorker).to receive(:perform_async).twice.and_return(true)
subject.perform(shard_name)
end
it 'schedules a job for each project where last attempt to sync failed' do
create(:geo_project_registry, :sync_failed)
create(:geo_project_registry, :synced)
expect(Geo::ProjectSyncWorker).to receive(:perform_async).once.and_return(true)
subject.perform(shard_name)
end
it 'schedules a job for each synced project updated recently' do
create(:geo_project_registry, :synced, :repository_dirty)
create(:geo_project_registry, :synced)
create(:geo_project_registry, :synced, :wiki_dirty)
expect(Geo::ProjectSyncWorker).to receive(:perform_async).twice.and_return(true)
subject.perform(shard_name)
end
it 'respects Geo secondary node max capacity per shard' do
stub_healthy_shards([shard_name, 'shard2', 'shard3', 'shard4', 'shard5'])
project_1 = create(:project)
project_2 = create(:project)
allow(Geo::ProjectSyncWorker).to receive(:perform_async).with(project_1.id, anything).and_return('jid-1')
allow(Geo::ProjectSyncWorker).to receive(:perform_async).with(project_2.id, anything).and_return('jid-2')
allow(Gitlab::SidekiqStatus).to receive(:job_status).with(['jid-2']).and_return([true], [false])
allow(Gitlab::SidekiqStatus).to receive(:job_status).with(['jid-1']).and_return([false])
expect(subject).to receive(:sleep).once.and_call_original
subject.perform(shard_name)
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment