Commit 7a0e51ca authored by Douglas Barbosa Alexandre's avatar Douglas Barbosa Alexandre

Merge branch '34324-scheduling-the-design-repositories-sync-backfill-part' into 'master'

Scheduling the design repositories sync

Closes #34324

See merge request gitlab-org/gitlab!21239
parents 8578a300 bb39376c
...@@ -3,7 +3,7 @@ ...@@ -3,7 +3,7 @@
module Geo module Geo
class DesignRegistryFinder < RegistryFinder class DesignRegistryFinder < RegistryFinder
def count_syncable def count_syncable
GeoNode.find(current_node_id).projects.count_designs GeoNode.find(current_node_id).projects.with_designs.count
end end
def count_synced def count_synced
...@@ -25,7 +25,7 @@ module Geo ...@@ -25,7 +25,7 @@ module Geo
def registries def registries
current_node current_node
.projects .projects
.inner_join_design_management .with_designs
.inner_join_design_registry .inner_join_design_registry
end end
end end
......
# frozen_string_literal: true
#
# rubocop:disable CodeReuse/ActiveRecord
module Geo
# Finder for retrieving unsynced designs that belong to a specific
# shard using FDW queries.
#
# Basic usage:
#
# Geo::DesignUnsyncedFinder
# .new(shard_name: 'default', batch_size: 1000)
# .execute.
class DesignUnsyncedFinder
def initialize(scheduled_project_ids: [], shard_name:, batch_size: nil)
@current_node = Geo::Fdw::GeoNode.find(Gitlab::Geo.current_node.id)
@scheduled_project_ids = scheduled_project_ids
@shard_name = shard_name
@batch_size = batch_size
end
def execute
return Geo::Fdw::Project.none unless valid_shard?
relation = projects
.with_designs
.missing_design_registry
.within_shards(shard_name)
.id_not_in(scheduled_project_ids)
.reorder(last_repository_updated_at: :desc)
relation = relation.limit(batch_size) unless batch_size.nil?
relation.pluck_primary_key
end
private
attr_reader :scheduled_project_ids, :current_node, :shard_name, :batch_size
def projects
return Geo::Fdw::Project.all if current_node.selective_sync_by_shards?
current_node.projects
end
def valid_shard?
return true unless current_node.selective_sync_by_shards?
current_node.selective_sync_shards.include?(shard_name)
end
end
end
# frozen_string_literal: true
module Geo
# Finder for retrieving designs updated recently that belong to a specific
# shard using FDW queries.
#
# Basic usage:
#
# Geo::DesignUpdatedRecentlyFinder
# .new(shard_name: 'default', batch_size: 1000)
# .execute.
class DesignUpdatedRecentlyFinder
def initialize(scheduled_project_ids: [], shard_name:, batch_size: nil)
@current_node = Geo::Fdw::GeoNode.find(Gitlab::Geo.current_node.id)
@scheduled_project_ids = scheduled_project_ids
@shard_name = shard_name
@batch_size = batch_size
end
# rubocop:disable CodeReuse/ActiveRecord
def execute
return Geo::Fdw::Project.none unless valid_shard?
relation = projects
.with_designs
.recently_updated_designs
.within_shards(shard_name)
.id_not_in(scheduled_project_ids)
.order('design_registry.last_synced_at ASC NULLS FIRST')
relation = relation.limit(batch_size) unless batch_size.nil?
relation.pluck_primary_key
end
# rubocop:enable CodeReuse/ActiveRecord
private
attr_reader :scheduled_project_ids, :current_node, :shard_name, :batch_size
def projects
return Geo::Fdw::Project.all if current_node.selective_sync_by_shards?
current_node.projects
end
def valid_shard?
return true unless current_node.selective_sync_by_shards?
current_node.selective_sync_shards.include?(shard_name)
end
end
end
...@@ -144,6 +144,7 @@ module EE ...@@ -144,6 +144,7 @@ module EE
scope :aimed_for_deletion, -> (date) { where('marked_for_deletion_at <= ?', date).without_deleted } scope :aimed_for_deletion, -> (date) { where('marked_for_deletion_at <= ?', date).without_deleted }
scope :with_repos_templates, -> { where(namespace_id: ::Gitlab::CurrentSettings.current_application_settings.custom_project_templates_group_id) } scope :with_repos_templates, -> { where(namespace_id: ::Gitlab::CurrentSettings.current_application_settings.custom_project_templates_group_id) }
scope :with_groups_level_repos_templates, -> { joins("INNER JOIN namespaces ON projects.namespace_id = namespaces.custom_project_templates_group_id") } scope :with_groups_level_repos_templates, -> { joins("INNER JOIN namespaces ON projects.namespace_id = namespaces.custom_project_templates_group_id") }
scope :with_designs, -> { where(id: DesignManagement::Design.select(:project_id)) }
delegate :shared_runners_minutes, :shared_runners_seconds, :shared_runners_seconds_last_reset, delegate :shared_runners_minutes, :shared_runners_seconds, :shared_runners_seconds_last_reset,
to: :statistics, allow_nil: true to: :statistics, allow_nil: true
...@@ -195,19 +196,6 @@ module EE ...@@ -195,19 +196,6 @@ module EE
joins('LEFT JOIN services ON services.project_id = projects.id AND services.type = \'GitlabSlackApplicationService\' AND services.active IS true') joins('LEFT JOIN services ON services.project_id = projects.id AND services.type = \'GitlabSlackApplicationService\' AND services.active IS true')
.where('services.id IS NULL') .where('services.id IS NULL')
end end
def inner_join_design_management
join_statement =
arel_table
.join(DesignManagement::Design.arel_table, Arel::Nodes::InnerJoin)
.on(arel_table[:id].eq(DesignManagement::Design.arel_table[:project_id]))
joins(join_statement.join_sources)
end
def count_designs
inner_join_design_management.distinct.count
end
end end
def can_store_security_reports? def can_store_security_reports?
......
...@@ -7,6 +7,7 @@ class Geo::DesignRegistry < Geo::BaseRegistry ...@@ -7,6 +7,7 @@ class Geo::DesignRegistry < Geo::BaseRegistry
belongs_to :project belongs_to :project
scope :pending, -> { with_state(:pending) }
scope :failed, -> { with_state(:failed) } scope :failed, -> { with_state(:failed) }
scope :synced, -> { with_state(:synced) } scope :synced, -> { with_state(:synced) }
scope :retry_due, -> { where(arel_table[:retry_at].eq(nil).or(arel_table[:retry_at].lt(Time.now))) } scope :retry_due, -> { where(arel_table[:retry_at].eq(nil).or(arel_table[:retry_at].lt(Time.now))) }
...@@ -22,7 +23,7 @@ class Geo::DesignRegistry < Geo::BaseRegistry ...@@ -22,7 +23,7 @@ class Geo::DesignRegistry < Geo::BaseRegistry
end end
before_transition any => :pending do |registry, _| before_transition any => :pending do |registry, _|
registry.retry_at = 0 registry.retry_at = nil
registry.retry_count = 0 registry.retry_count = 0
end end
...@@ -50,6 +51,10 @@ class Geo::DesignRegistry < Geo::BaseRegistry ...@@ -50,6 +51,10 @@ class Geo::DesignRegistry < Geo::BaseRegistry
designs_repositories designs_repositories
end end
def self.updated_recently
pending.or(failed.retry_due)
end
def fail_sync!(message, error, attrs = {}) def fail_sync!(message, error, attrs = {})
new_retry_count = retry_count + 1 new_retry_count = retry_count + 1
......
...@@ -90,11 +90,24 @@ module Geo ...@@ -90,11 +90,24 @@ module Geo
joins(join_statement.join_sources) joins(join_statement.join_sources)
end end
def inner_join_design_management def missing_design_registry
left_outer_join_design_registry
.where(Geo::DesignRegistry.arel_table[:project_id].eq(nil))
end
def recently_updated_designs
inner_join_design_registry
.merge(Geo::DesignRegistry.updated_recently)
end
def with_designs
design_table = Geo::Fdw::DesignManagementDesign.arel_table
design_subquery = design_table.project(design_table[:project_id]).distinct.as('sub_design_table')
join_statement = join_statement =
arel_table arel_table
.join(Geo::Fdw::DesignManagementDesign.arel_table, Arel::Nodes::InnerJoin) .join(design_subquery, Arel::Nodes::InnerJoin)
.on(arel_table[:id].eq(Geo::Fdw::DesignManagementDesign.arel_table[:project_id])) .on(arel_table[:id].eq(design_subquery[:project_id]))
joins(join_statement.join_sources) joins(join_statement.join_sources)
end end
...@@ -109,6 +122,15 @@ module Geo ...@@ -109,6 +122,15 @@ module Geo
joins(join_statement.join_sources) joins(join_statement.join_sources)
end end
def left_outer_join_design_registry
join_statement =
arel_table
.join(Geo::DesignRegistry.arel_table, Arel::Nodes::OuterJoin)
.on(arel_table[:id].eq(Geo::DesignRegistry.arel_table[:project_id]))
joins(join_statement.join_sources)
end
end end
end end
end end
......
...@@ -42,6 +42,7 @@ ...@@ -42,6 +42,7 @@
- geo:geo_repository_cleanup - geo:geo_repository_cleanup
- geo:geo_repository_destroy - geo:geo_repository_destroy
- geo:geo_repository_shard_sync - geo:geo_repository_shard_sync
- geo:geo_design_repository_shard_sync
- geo:geo_repository_verification_primary_shard - geo:geo_repository_verification_primary_shard
- geo:geo_repository_verification_primary_single - geo:geo_repository_verification_primary_single
- geo:geo_repository_verification_secondary_single - geo:geo_repository_verification_secondary_single
......
# frozen_string_literal: true
module Geo
class DesignRepositoryShardSyncWorker < RepositoryShardSyncWorker
private
def schedule_job(project_id)
job_id = Geo::DesignRepositorySyncWorker.perform_async(project_id)
{ project_id: project_id, job_id: job_id } if job_id
end
def find_project_ids_not_synced(batch_size:)
Geo::DesignUnsyncedFinder
.new(scheduled_project_ids: scheduled_project_ids, shard_name: shard_name, batch_size: batch_size)
.execute
end
def find_project_ids_updated_recently(batch_size:)
Geo::DesignUpdatedRecentlyFinder
.new(scheduled_project_ids: scheduled_project_ids, shard_name: shard_name, batch_size: batch_size)
.execute
end
end
end
...@@ -8,6 +8,8 @@ module Geo ...@@ -8,6 +8,8 @@ module Geo
else else
Geo::RepositoryShardSyncWorker.perform_async(shard_name) Geo::RepositoryShardSyncWorker.perform_async(shard_name)
end end
Geo::DesignRepositoryShardSyncWorker.perform_async(shard_name)
end end
end end
end end
# frozen_string_literal: true
require 'spec_helper'
describe Geo::DesignUnsyncedFinder, :geo, :geo_fdw do
include EE::GeoHelpers
describe '#execute' do
let(:node) { create(:geo_node) }
let(:group_1) { create(:group) }
let(:group_2) { create(:group) }
let(:nested_group_1) { create(:group, parent: group_1) }
let!(:project_1) { create(:project, group: group_1) }
let!(:project_2) { create(:project, group: nested_group_1) }
let!(:project_3) { create(:project, group: group_2) }
let!(:project_4) { create(:project, group: group_1) }
before do
project_4.update_column(:repository_storage, 'foo')
create(:design, project: project_1)
create(:design, project: project_2)
create(:design, project: project_3)
create(:design, project: project_4)
stub_current_geo_node(node)
end
subject { described_class.new(shard_name: 'default', batch_size: 100) }
context 'without selective sync' do
it 'returns designs without an entry on the tracking database' do
create(:geo_design_registry, :synced, project: project_2)
expect(subject.execute).to match_array([project_1.id, project_3.id])
end
end
context 'with selective sync by namespace' do
it 'returns designs that belong to the namespaces without an entry on the tracking database' do
create(:geo_design_registry, :synced, project: project_4)
node.update!(selective_sync_type: 'namespaces', namespaces: [group_1, nested_group_1])
expect(subject.execute).to match_array([project_1.id, project_2.id])
end
end
context 'with selective sync by shard' do
before do
node.update!(selective_sync_type: 'shards', selective_sync_shards: ['foo'])
end
it 'does not return designs out of synced shards' do
subject = described_class.new(shard_name: 'default', batch_size: 100)
expect(subject.execute).to be_empty
end
it 'returns designs that belong to the shards without an entry on the tracking database' do
project_5 = create(:project, group: group_1)
project_5.update_column(:repository_storage, 'foo')
create(:design, project: project_5)
create(:geo_design_registry, :synced, project: project_4)
subject = described_class.new(shard_name: 'foo', batch_size: 100)
expect(subject.execute).to match_array([project_5.id])
end
end
end
end
# frozen_string_literal: true
require 'spec_helper'
describe Geo::DesignUpdatedRecentlyFinder, :geo, :geo_fdw do
include EE::GeoHelpers
describe '#execute' do
let(:node) { create(:geo_node) }
let(:group_1) { create(:group) }
let(:group_2) { create(:group) }
let(:nested_group_1) { create(:group, parent: group_1) }
let!(:project_1) { create(:project, group: group_1) }
let!(:project_2) { create(:project, group: nested_group_1) }
let!(:project_3) { create(:project, group: group_2) }
let!(:project_4) { create(:project, group: group_1) }
before do
project_4.update_column(:repository_storage, 'foo')
create(:geo_design_registry, project: project_1)
create(:geo_design_registry, project: project_2)
create(:geo_design_registry, :synced, project: project_3)
create(:geo_design_registry, project: project_4)
create(:design, project: project_1)
create(:design, project: project_2)
create(:design, project: project_3)
create(:design, project: project_4)
stub_current_geo_node(node)
end
subject { described_class.new(shard_name: 'default', batch_size: 100) }
context 'without selective sync' do
it 'returns desings with a dirty entry on the tracking database' do
expect(subject.execute).to match_array([project_1.id, project_2.id])
end
end
context 'with selective sync by namespace' do
it 'returns designs that belong to the namespaces with a dirty entry on the tracking database' do
node.update!(selective_sync_type: 'namespaces', namespaces: [group_1])
expect(subject.execute).to match_array([project_1.id, project_2.id])
end
end
context 'with selective sync by shard' do
before do
node.update!(selective_sync_type: 'shards', selective_sync_shards: ['foo'])
end
it 'does not return designs out of selected shard' do
subject = described_class.new(shard_name: 'default', batch_size: 100)
expect(subject.execute).to be_empty
end
it 'returns designs that belong to the shards with a dirty entry on the tracking database' do
project_5 = create(:project, group: group_1)
project_5.update_column(:repository_storage, 'foo')
create(:design, project: project_5)
create(:geo_design_registry, :synced, project: project_5)
subject = described_class.new(shard_name: 'foo', batch_size: 100)
expect(subject.execute).to match_array([project_4.id])
end
end
end
end
# frozen_string_literal: true
require 'spec_helper'
describe Geo::DesignRepositoryShardSyncWorker, :geo, :geo_fdw, :clean_gitlab_redis_cache do
include ::EE::GeoHelpers
include ExclusiveLeaseHelpers
let!(:primary) { create(:geo_node, :primary) }
let!(:secondary) { create(:geo_node) }
let(:shard_name) { Gitlab.config.repositories.storages.keys.first }
before do
stub_current_geo_node(secondary)
end
describe '#perform' do
let(:restricted_group) { create(:group) }
let(:unsynced_project_in_restricted_group) { create(:project, group: restricted_group) }
let(:unsynced_project) { create(:project) }
before do
stub_exclusive_lease(renew: true)
Gitlab::ShardHealthCache.update([shard_name])
create(:design, project: unsynced_project_in_restricted_group)
create(:design, project: unsynced_project)
end
it 'performs Geo::DesignRepositorySyncWorker for each project' do
expect(Geo::DesignRepositorySyncWorker).to receive(:perform_async).twice.and_return(spy)
subject.perform(shard_name)
end
it 'performs Geo::DesignRepositorySyncWorker for designs where last attempt to sync failed' do
create(:geo_design_registry, :sync_failed, project: unsynced_project_in_restricted_group)
create(:geo_design_registry, :synced, project: unsynced_project)
expect(Geo::DesignRepositorySyncWorker).to receive(:perform_async).once.and_return(spy)
subject.perform(shard_name)
end
it 'does not perform Geo::DesignRepositorySyncWorker when shard becomes unhealthy' do
Gitlab::ShardHealthCache.update([])
expect(Geo::DesignRepositorySyncWorker).not_to receive(:perform_async)
subject.perform(shard_name)
end
it 'performs Geo::DesignRepositorySyncWorker for designs updated recently' do
create(:geo_design_registry, project: unsynced_project_in_restricted_group)
create(:geo_design_registry, :synced, project: unsynced_project)
create(:geo_design_registry)
expect(Geo::DesignRepositorySyncWorker).to receive(:perform_async).twice.and_return(spy)
subject.perform(shard_name)
end
it 'does not schedule a job twice for the same project' do
scheduled_jobs = [
{ job_id: 1, project_id: unsynced_project.id },
{ job_id: 2, project_id: unsynced_project_in_restricted_group.id }
]
is_expected.to receive(:scheduled_jobs).and_return(scheduled_jobs).at_least(:once)
is_expected.not_to receive(:schedule_job)
Sidekiq::Testing.inline! { subject.perform(shard_name) }
end
it 'does not perform Geo::DesignRepositorySyncWorker when no geo database is configured' do
allow(Gitlab::Geo).to receive(:geo_database_configured?) { false }
expect(Geo::DesignRepositorySyncWorker).not_to receive(:perform_async)
subject.perform(shard_name)
# We need to unstub here or the DatabaseCleaner will have issues since it
# will appear as though the tracking DB were not available
allow(Gitlab::Geo).to receive(:geo_database_configured?).and_call_original
end
it 'does not perform Geo::ProjectSyncWorker when not running on a secondary' do
allow(Gitlab::Geo).to receive(:secondary?) { false }
expect(Geo::DesignRepositorySyncWorker).not_to receive(:perform_async)
subject.perform(shard_name)
end
it 'does not perform Geo::DesignRepositorySyncWorker when node is disabled' do
allow_any_instance_of(GeoNode).to receive(:enabled?) { false }
expect(Geo::DesignRepositorySyncWorker).not_to receive(:perform_async)
subject.perform(shard_name)
end
context 'multiple shards' do
it 'uses two loops to schedule jobs', :sidekiq_might_not_need_inline do
expect(subject).to receive(:schedule_jobs).twice.and_call_original
Gitlab::ShardHealthCache.update([shard_name, 'shard2', 'shard3', 'shard4', 'shard5'])
secondary.update!(repos_max_capacity: 5)
subject.perform(shard_name)
end
end
context 'when node has namespace restrictions', :request_store do
before do
secondary.update!(selective_sync_type: 'namespaces', namespaces: [restricted_group])
allow(::Gitlab::Geo).to receive(:current_node).and_call_original
Rails.cache.write(:current_node, secondary.to_json)
allow(::GeoNode).to receive(:current_node).and_return(secondary)
end
it 'does not perform Geo::DesignRepositorySyncWorker for projects that do not belong to selected namespaces to replicate' do
expect(Geo::DesignRepositorySyncWorker).to receive(:perform_async)
.with(unsynced_project_in_restricted_group.id)
.once
.and_return(spy)
subject.perform(shard_name)
end
it 'does not perform Geo::DesignRepositorySyncWorker for synced projects updated recently that do not belong to selected namespaces to replicate' do
create(:geo_design_registry, project: unsynced_project_in_restricted_group)
create(:geo_design_registry, project: unsynced_project)
expect(Geo::DesignRepositorySyncWorker).to receive(:perform_async)
.with(unsynced_project_in_restricted_group.id)
.once
.and_return(spy)
subject.perform(shard_name)
end
end
end
end
...@@ -11,6 +11,7 @@ describe Geo::RepositorySyncWorker, :geo, :clean_gitlab_redis_cache do ...@@ -11,6 +11,7 @@ describe Geo::RepositorySyncWorker, :geo, :clean_gitlab_redis_cache do
let!(:project_in_synced_group) { create(:project, group: synced_group) } let!(:project_in_synced_group) { create(:project, group: synced_group) }
let!(:unsynced_project) { create(:project) } let!(:unsynced_project) { create(:project) }
let(:healthy_shard_name) { project_in_synced_group.repository.storage } let(:healthy_shard_name) { project_in_synced_group.repository.storage }
let(:design_worker) { Geo::DesignRepositoryShardSyncWorker }
before do before do
stub_current_geo_node(secondary) stub_current_geo_node(secondary)
...@@ -32,6 +33,7 @@ describe Geo::RepositorySyncWorker, :geo, :clean_gitlab_redis_cache do ...@@ -32,6 +33,7 @@ describe Geo::RepositorySyncWorker, :geo, :clean_gitlab_redis_cache do
end end
expect(worker).not_to receive(:perform_async).with('broken') expect(worker).not_to receive(:perform_async).with('broken')
expect(design_worker).not_to receive(:perform_async).with('broken')
subject.perform subject.perform
end end
...@@ -44,7 +46,9 @@ describe Geo::RepositorySyncWorker, :geo, :clean_gitlab_redis_cache do ...@@ -44,7 +46,9 @@ describe Geo::RepositorySyncWorker, :geo, :clean_gitlab_redis_cache do
.and_return([result(true, healthy_shard_name), result(true, 'broken')]) .and_return([result(true, healthy_shard_name), result(true, 'broken')])
expect(worker).to receive(:perform_async).with('default') expect(worker).to receive(:perform_async).with('default')
expect(design_worker).to receive(:perform_async).with('default')
expect(worker).not_to receive(:perform_async).with('broken') expect(worker).not_to receive(:perform_async).with('broken')
expect(design_worker).not_to receive(:perform_async).with('broken')
subject.perform subject.perform
end end
...@@ -61,7 +65,9 @@ describe Geo::RepositorySyncWorker, :geo, :clean_gitlab_redis_cache do ...@@ -61,7 +65,9 @@ describe Geo::RepositorySyncWorker, :geo, :clean_gitlab_redis_cache do
stub_storage_settings({}) stub_storage_settings({})
expect(worker).to receive(:perform_async).with(project_in_synced_group.repository.storage) expect(worker).to receive(:perform_async).with(project_in_synced_group.repository.storage)
expect(design_worker).to receive(:perform_async).with(project_in_synced_group.repository.storage)
expect(worker).not_to receive(:perform_async).with('unknown') expect(worker).not_to receive(:perform_async).with('unknown')
expect(design_worker).not_to receive(:perform_async).with('unknown')
subject.perform subject.perform
end end
...@@ -77,7 +83,9 @@ describe Geo::RepositorySyncWorker, :geo, :clean_gitlab_redis_cache do ...@@ -77,7 +83,9 @@ describe Geo::RepositorySyncWorker, :geo, :clean_gitlab_redis_cache do
.and_return([result(true, healthy_shard_name), result(false, 'broken')]) .and_return([result(true, healthy_shard_name), result(false, 'broken')])
expect(worker).to receive(:perform_async).with(healthy_shard_name) expect(worker).to receive(:perform_async).with(healthy_shard_name)
expect(design_worker).to receive(:perform_async).with(healthy_shard_name)
expect(worker).not_to receive(:perform_async).with('broken') expect(worker).not_to receive(:perform_async).with('broken')
expect(design_worker).not_to receive(:perform_async).with('broken')
subject.perform subject.perform
end end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment