Commit bf7fffdb authored by Douglas Barbosa Alexandre's avatar Douglas Barbosa Alexandre Committed by Mayra Cabrera

Add method to find registries differences for projects

This method is used to find untracked projects that
we need to sync and unused registries that we can
remove.
parent fc4d4d5f
# frozen_string_literal: true
module Geo
class ProjectRegistryFinder
# Returns ProjectRegistry records that have never been synced.
#
# Does not care about selective sync, because it considers the Registry
# table to be the single source of truth. The contract is that other
# processes need to ensure that the table only contains records that should
# be synced.
#
# Any registries that have ever been synced that currently need to be
# resynced will be handled by other find methods (like
# #find_retryable_dirty_registries)
#
# You can pass a list with `except_ids:` so you can exclude items you
# already scheduled but haven't finished and aren't persisted to the database yet
#
# @param [Integer] batch_size used to limit the results returned
# @param [Array<Integer>] except_ids ids that will be ignored from the query
# rubocop:disable CodeReuse/ActiveRecord
def find_never_synced_registries(batch_size:, except_ids: [])
Geo::ProjectRegistry
.never_synced
.model_id_not_in(except_ids)
.limit(batch_size)
end
# rubocop:enable CodeReuse/ActiveRecord
# rubocop:disable CodeReuse/ActiveRecord
def find_retryable_dirty_registries(batch_size:, except_ids: [])
Geo::ProjectRegistry
.dirty
.retry_due
.model_id_not_in(except_ids)
.order(Gitlab::Database.nulls_first_order(:last_repository_synced_at))
.limit(batch_size)
end
# rubocop:enable CodeReuse/ActiveRecord
end
end
......@@ -11,6 +11,10 @@ class Geo::BaseRegistry < Geo::TrackingBase
where(self::MODEL_FOREIGN_KEY => range).pluck(self::MODEL_FOREIGN_KEY)
end
def self.pluck_model_foreign_key
where(nil).pluck(self::MODEL_FOREIGN_KEY)
end
def self.model_id_in(ids)
where(self::MODEL_FOREIGN_KEY => ids)
end
......
# frozen_string_literal: true
class Geo::DeletedProject
attr_reader :id, :name, :disk_path
include ActiveModel::Validations
attr_accessor :id, :name, :disk_path
validates :id, :name, :disk_path, presence: true
def initialize(id:, name:, disk_path:, repository_storage:)
@id = id
......
......@@ -3,6 +3,9 @@
class Geo::DesignRegistry < Geo::BaseRegistry
include ::Delay
MODEL_CLASS = ::Project
MODEL_FOREIGN_KEY = :project_id
RETRIES_BEFORE_REDOWNLOAD = 5
belongs_to :project
......
......@@ -5,6 +5,9 @@ class Geo::ProjectRegistry < Geo::BaseRegistry
include ::EachBatch
include ::ShaAttribute
MODEL_CLASS = ::Project
MODEL_FOREIGN_KEY = :project_id
REGISTRY_TYPES = %i{repository wiki}.freeze
RETRIES_BEFORE_REDOWNLOAD = 5
......@@ -39,6 +42,34 @@ class Geo::ProjectRegistry < Geo::BaseRegistry
where(nil).pluck(:project_id)
end
def self.registry_consistency_worker_enabled?
Feature.enabled?(:geo_project_registry_ssot_sync)
end
def self.has_create_events?
true
end
def self.find_registry_differences(range)
source_ids = Gitlab::Geo.current_node.projects.id_in(range).pluck_primary_key
tracked_ids = self.pluck_model_ids_in_range(range)
untracked_ids = source_ids - tracked_ids
unused_tracked_ids = tracked_ids - source_ids
[untracked_ids, unused_tracked_ids]
end
def self.delete_worker_class
::GeoRepositoryDestroyWorker
end
def self.delete_for_model_ids(project_ids)
project_ids.map do |project_id|
delete_worker_class.perform_async(project_id)
end
end
def self.failed
repository_sync_failed = arel_table[:repository_retry_count].gt(0)
wiki_sync_failed = arel_table[:wiki_retry_count].gt(0)
......
......@@ -3,10 +3,14 @@
module Geo
class RepositoryDestroyService
include ::Gitlab::Geo::LogHelpers
include ::Gitlab::Utils::StrongMemoize
attr_reader :id, :name, :disk_path, :repository_storage
def initialize(id, name, disk_path, repository_storage)
# There is a possibility that the replicable's record does not exist
# anymore. In this case, you need to pass the optional parameters
# explicitly.
def initialize(id, name = nil, disk_path = nil, repository_storage = nil)
@id = id
@name = name
@disk_path = disk_path
......@@ -29,25 +33,36 @@ module Geo
private
def destroy_project
::Projects::DestroyService.new(deleted_project, nil).geo_replicate
# We should skip if we had to rebuild the project, but we don't
# have the information that our service class requires.
return if project.is_a?(Geo::DeletedProject) && !project.valid?
::Projects::DestroyService.new(project, nil).geo_replicate
end
# rubocop: disable CodeReuse/ActiveRecord
def destroy_registry_entries
::Geo::ProjectRegistry.where(project_id: id).delete_all
::Geo::DesignRegistry.where(project_id: id).delete_all
::Geo::ProjectRegistry.model_id_in(id).delete_all
::Geo::DesignRegistry.model_id_in(id).delete_all
log_info("Registry entries removed", project_id: id)
log_info('Registry entries removed', project_id: id)
end
# rubocop: enable CodeReuse/ActiveRecord
def deleted_project
# We don't have access to the original model anymore, so we are
# rebuilding only what our service class requires
::Geo::DeletedProject.new(id: id,
name: name,
disk_path: disk_path,
repository_storage: repository_storage)
def project
strong_memoize(:project) do
Project.find(id)
rescue ActiveRecord::RecordNotFound => e
# When cleaning up project/registries, there are some cases where
# the replicable record does not exist anymore. So, we try to
# rebuild it with only what our service class requires.
log_error('Could not find project', e.message)
::Geo::DeletedProject.new(
id: id,
name: name,
disk_path: disk_path,
repository_storage: repository_storage
)
end
end
end
end
......@@ -10,15 +10,15 @@ module Geo
{ project_id: project_id, job_id: job_id } if job_id
end
def find_project_ids_not_synced(batch_size:)
def find_project_ids_not_synced(except_ids:, batch_size:)
Geo::DesignUnsyncedFinder
.new(scheduled_project_ids: scheduled_project_ids, shard_name: shard_name, batch_size: batch_size)
.new(scheduled_project_ids: except_ids, shard_name: shard_name, batch_size: batch_size)
.execute
end
def find_project_ids_updated_recently(batch_size:)
def find_project_ids_updated_recently(except_ids:, batch_size:)
Geo::DesignUpdatedRecentlyFinder
.new(scheduled_project_ids: scheduled_project_ids, shard_name: shard_name, batch_size: batch_size)
.new(scheduled_project_ids: except_ids, shard_name: shard_name, batch_size: batch_size)
.execute
end
end
......
......@@ -62,22 +62,31 @@ module Geo
end
def load_pending_resources
resources = find_project_ids_not_synced(batch_size: db_retrieve_batch_size)
return [] unless valid_shard?
resources = find_project_ids_not_synced(except_ids: scheduled_project_ids, batch_size: db_retrieve_batch_size)
remaining_capacity = db_retrieve_batch_size - resources.size
if remaining_capacity.zero?
resources
else
resources + find_project_ids_updated_recently(batch_size: remaining_capacity)
resources + find_project_ids_updated_recently(except_ids: scheduled_project_ids + resources, batch_size: remaining_capacity)
end
end
# rubocop: disable CodeReuse/ActiveRecord
def find_project_ids_not_synced(batch_size:)
find_unsynced_projects(batch_size: batch_size)
.id_not_in(scheduled_project_ids)
.reorder(last_repository_updated_at: :desc)
.pluck_primary_key
def find_project_ids_not_synced(except_ids:, batch_size:)
if Geo::ProjectRegistry.registry_consistency_worker_enabled?
project_ids =
find_never_synced_project_ids(batch_size: batch_size, except_ids: except_ids)
find_project_ids_within_shard(project_ids, direction: :desc)
else
find_unsynced_projects(batch_size: batch_size)
.id_not_in(except_ids)
.reorder(last_repository_updated_at: :desc)
.pluck_primary_key
end
end
# rubocop: enable CodeReuse/ActiveRecord
......@@ -88,11 +97,18 @@ module Geo
end
# rubocop: disable CodeReuse/ActiveRecord
def find_project_ids_updated_recently(batch_size:)
find_projects_updated_recently(batch_size: batch_size)
.id_not_in(scheduled_project_ids)
.order('project_registry.last_repository_synced_at ASC NULLS FIRST, projects.last_repository_updated_at ASC')
.pluck_primary_key
def find_project_ids_updated_recently(except_ids:, batch_size:)
if Geo::ProjectRegistry.registry_consistency_worker_enabled?
project_ids =
find_retryable_dirty_project_ids(batch_size: batch_size, except_ids: except_ids)
find_project_ids_within_shard(project_ids, direction: :asc)
else
find_projects_updated_recently(batch_size: batch_size)
.id_not_in(except_ids)
.order('project_registry.last_repository_synced_at ASC NULLS FIRST, projects.last_repository_updated_at ASC')
.pluck_primary_key
end
end
# rubocop: enable CodeReuse/ActiveRecord
......@@ -101,5 +117,37 @@ module Geo
.new(current_node: current_node, shard_name: shard_name, batch_size: batch_size)
.execute
end
def valid_shard?
return true unless current_node.selective_sync_by_shards?
current_node.selective_sync_shards.include?(shard_name)
end
def find_never_synced_project_ids(batch_size:, except_ids:)
registry_finder
.find_never_synced_registries(batch_size: batch_size, except_ids: except_ids)
.pluck_model_foreign_key
end
def find_retryable_dirty_project_ids(batch_size:, except_ids:)
registry_finder
.find_retryable_dirty_registries(batch_size: batch_size, except_ids: except_ids)
.pluck_model_foreign_key
end
# rubocop:disable CodeReuse/ActiveRecord
def find_project_ids_within_shard(project_ids, direction:)
Project
.id_in(project_ids)
.within_shards(shard_name)
.reorder(last_repository_updated_at: direction)
.pluck_primary_key
end
# rubocop:enable CodeReuse/ActiveRecord
def registry_finder
@registry_finder ||= Geo::ProjectRegistryFinder.new
end
end
end
......@@ -18,8 +18,9 @@ module Geo
REGISTRY_CLASSES = [
Geo::JobArtifactRegistry,
Geo::LfsObjectRegistry,
Geo::UploadRegistry,
Geo::PackageFileRegistry
Geo::PackageFileRegistry,
Geo::ProjectRegistry,
Geo::UploadRegistry
].freeze
BATCH_SIZE = 1000
......
......@@ -3,10 +3,13 @@
class GeoRepositoryDestroyWorker # rubocop:disable Scalability/IdempotentWorker
include ApplicationWorker
include GeoQueue
include ::Gitlab::Geo::LogHelpers
loggable_arguments 1, 2, 3
def perform(id, name, disk_path, storage_name)
def perform(id, name = nil, disk_path = nil, storage_name = nil)
log_info('Executing Geo::RepositoryDestroyService', id: id, name: name, disk_path: disk_path, storage_name: storage_name)
Geo::RepositoryDestroyService.new(id, name, disk_path, storage_name).execute
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Geo::ProjectRegistryFinder, :geo do
let_it_be(:project_1) { create(:project) }
let_it_be(:project_2) { create(:project) }
let_it_be(:project_3) { create(:project) }
let_it_be(:project_4) { create(:project) }
let_it_be(:project_5) { create(:project) }
let_it_be(:project_6) { create(:project) }
let_it_be(:registry_project_1) { create(:geo_project_registry, :synced, project_id: project_1.id) }
let_it_be(:registry_project_2) { create(:geo_project_registry, :sync_failed, project_id: project_2.id) }
let_it_be(:registry_project_3) { create(:geo_project_registry, project_id: project_3.id) }
let_it_be(:registry_project_4) { create(:geo_project_registry, :repository_dirty, project_id: project_4.id, last_repository_synced_at: 2.days.ago) }
let_it_be(:registry_project_5) { create(:geo_project_registry, :wiki_dirty, project_id: project_5.id, last_repository_synced_at: 5.days.ago) }
let_it_be(:registry_project_6) { create(:geo_project_registry, project_id: project_6.id) }
describe '#find_never_synced_registries' do
it 'returns registries for projects that have never been synced' do
registries = subject.find_never_synced_registries(batch_size: 10)
expect(registries).to match_ids(registry_project_3, registry_project_6)
end
it 'excludes except_ids' do
registries = subject.find_never_synced_registries(batch_size: 10, except_ids: [project_3.id])
expect(registries).to match_ids(registry_project_6)
end
end
describe '#find_retryable_dirty_registries' do
it 'returns registries for projects that have been recently updated or that have never been synced' do
registries = subject.find_retryable_dirty_registries(batch_size: 10)
expect(registries).to match_ids(registry_project_2, registry_project_3, registry_project_4, registry_project_5, registry_project_6)
end
it 'excludes except_ids' do
registries = subject.find_retryable_dirty_registries(batch_size: 10, except_ids: [project_4.id, project_5.id, project_6.id])
expect(registries).to match_ids(registry_project_2, registry_project_3)
end
end
end
......@@ -2,9 +2,11 @@
require 'spec_helper'
RSpec.describe Geo::DeletedProject, type: :model do
RSpec.describe Geo::DeletedProject, :geo, type: :model do
include StubConfiguration
subject { described_class.new(id: 1, name: 'sample', disk_path: 'root/sample', repository_storage: 'foo') }
before do
storages = {
'foo' => { 'path' => 'tmp/tests/storage_foo' },
......@@ -14,11 +16,23 @@ RSpec.describe Geo::DeletedProject, type: :model do
stub_storage_settings(storages)
end
subject { described_class.new(id: 1, name: 'sample', disk_path: 'root/sample', repository_storage: 'foo') }
describe 'attributes' do
it { is_expected.to respond_to(:id) }
it { is_expected.to respond_to(:name) }
it { is_expected.to respond_to(:disk_path) }
end
it { is_expected.to respond_to(:id) }
it { is_expected.to respond_to(:name) }
it { is_expected.to respond_to(:disk_path) }
describe 'validations' do
it { is_expected.to validate_presence_of(:id) }
it { is_expected.to validate_presence_of(:name) }
it { is_expected.to validate_presence_of(:disk_path) }
end
describe 'attributes' do
it { is_expected.to respond_to(:id) }
it { is_expected.to respond_to(:name) }
it { is_expected.to respond_to(:disk_path) }
end
describe '#full_path' do
it 'is an alias for disk_path' do
......
......@@ -25,6 +25,154 @@ RSpec.describe Geo::ProjectRegistry, :geo_fdw do
it { is_expected.to validate_uniqueness_of(:project) }
end
describe '.find_registry_differences' do
let!(:secondary) { create(:geo_node) }
let!(:synced_group) { create(:group) }
let!(:nested_group) { create(:group, parent: synced_group) }
let!(:project_1) { create(:project, group: synced_group) }
let!(:project_2) { create(:project, group: nested_group) }
let!(:project_3) { create(:project) }
let!(:project_4) { create(:project) }
let!(:project_5) { create(:project, :broken_storage) }
let!(:project_6) { create(:project, :broken_storage) }
before do
stub_current_geo_node(secondary)
end
context 'untracked IDs' do
before do
create(:geo_project_registry, project_id: project_1.id)
create(:geo_project_registry, :sync_failed, project_id: project_3.id)
create(:geo_project_registry, project_id: project_5.id)
end
it 'includes project IDs without an entry on the tracking database' do
range = Project.minimum(:id)..Project.maximum(:id)
untracked_ids, _ = described_class.find_registry_differences(range)
expect(untracked_ids).to match_array([project_2.id, project_4.id, project_6.id])
end
it 'excludes projects outside the ID range' do
untracked_ids, _ = described_class.find_registry_differences(project_4.id..project_6.id)
expect(untracked_ids).to match_array([project_4.id, project_6.id])
end
context 'with selective sync by namespace' do
let(:secondary) { create(:geo_node, selective_sync_type: 'namespaces', namespaces: [synced_group]) }
it 'excludes project IDs that are not in selectively synced projects' do
range = Project.minimum(:id)..Project.maximum(:id)
untracked_ids, _ = described_class.find_registry_differences(range)
expect(untracked_ids).to match_array([project_2.id])
end
end
context 'with selective sync by shard' do
let(:secondary) { create(:geo_node, selective_sync_type: 'shards', selective_sync_shards: ['broken']) }
it 'excludes project IDs that are not in selectively synced projects' do
range = Project.minimum(:id)..Project.maximum(:id)
untracked_ids, _ = described_class.find_registry_differences(range)
expect(untracked_ids).to match_array([project_6.id])
end
end
end
context 'unused tracked IDs' do
context 'with an orphaned registry' do
let!(:orphaned) { create(:geo_project_registry, project_id: project_1.id) }
before do
project_1.delete
end
it 'includes tracked IDs that do not exist in the model table' do
range = project_1.id..project_1.id
_, unused_tracked_ids = described_class.find_registry_differences(range)
expect(unused_tracked_ids).to match_array([project_1.id])
end
it 'excludes IDs outside the ID range' do
range = (project_1.id + 1)..Project.maximum(:id)
_, unused_tracked_ids = described_class.find_registry_differences(range)
expect(unused_tracked_ids).to be_empty
end
end
context 'with selective sync by namespace' do
let(:secondary) { create(:geo_node, selective_sync_type: 'namespaces', namespaces: [synced_group]) }
context 'with a tracked project' do
context 'excluded from selective sync' do
let!(:registry_entry) { create(:geo_project_registry, project_id: project_3.id) }
it 'includes tracked project IDs that exist but are not in a selectively synced project' do
range = project_3.id..project_3.id
_, unused_tracked_ids = described_class.find_registry_differences(range)
expect(unused_tracked_ids).to match_array([project_3.id])
end
end
context 'included in selective sync' do
let!(:registry_entry) { create(:geo_project_registry, project_id: project_1.id) }
it 'excludes tracked project IDs that are in selectively synced projects' do
range = project_1.id..project_1.id
_, unused_tracked_ids = described_class.find_registry_differences(range)
expect(unused_tracked_ids).to be_empty
end
end
end
end
context 'with selective sync by shard' do
let(:secondary) { create(:geo_node, selective_sync_type: 'shards', selective_sync_shards: ['broken']) }
context 'with a tracked project' do
let!(:registry_entry) { create(:geo_project_registry, project_id: project_1.id) }
context 'excluded from selective sync' do
it 'includes tracked project IDs that exist but are not in a selectively synced project' do
range = project_1.id..project_1.id
_, unused_tracked_ids = described_class.find_registry_differences(range)
expect(unused_tracked_ids).to match_array([project_1.id])
end
end
context 'included in selective sync' do
let!(:registry_entry) { create(:geo_project_registry, project_id: project_5.id) }
it 'excludes tracked project IDs that are in selectively synced projects' do
range = project_5.id..project_5.id
_, unused_tracked_ids = described_class.find_registry_differences(range)
expect(unused_tracked_ids).to be_empty
end
end
end
end
end
end
describe '.synced_repos' do
it 'returns clean projects where last attempt to sync succeeded' do
expected = []
......
......@@ -46,6 +46,10 @@ RSpec.describe Geo::RegistryConsistencyService, :geo, :use_clean_rails_memory_st
expect(registry_class).to respond_to(:delete_for_model_ids)
end
it 'responds to .find_registry_differences' do
expect(registry_class).to respond_to(:find_registry_differences)
end
it 'responds to .has_create_events?' do
expect(registry_class).to respond_to(:has_create_events?)
end
......
......@@ -2,7 +2,7 @@
require 'spec_helper'
RSpec.describe Geo::RepositoryDestroyService do
RSpec.describe Geo::RepositoryDestroyService, :geo do
include ::EE::GeoHelpers
let_it_be(:secondary) { create(:geo_node) }
......@@ -128,5 +128,49 @@ RSpec.describe Geo::RepositoryDestroyService do
expect(Geo::DesignRegistry.where(project: project)).to be_empty
end
end
context 'with an unused registry' do
let!(:project) { create(:project_empty_repo, :legacy_storage) }
let!(:unused_project_registry) { create(:geo_project_registry, project_id: project.id) }
let!(:unused_design_registry) { create(:geo_design_registry, project_id: project.id) }
subject(:service) { described_class.new(project.id) }
context 'when the replicable model does not exist' do
before do
project.delete
end
it 'does not delegate project removal to Projects::DestroyService' do
expect_any_instance_of(EE::Projects::DestroyService).not_to receive(:geo_replicate)
service.execute
end
it 'removes the registry entries' do
service.execute
expect(Geo::ProjectRegistry.where(project: project)).to be_empty
expect(Geo::DesignRegistry.where(project: project)).to be_empty
end
end
context 'when the replicable model exists' do
subject(:service) { described_class.new(project.id) }
it 'delegates project removal to Projects::DestroyService' do
expect_any_instance_of(EE::Projects::DestroyService).to receive(:geo_replicate)
service.execute
end
it 'removes the registry entries' do
service.execute
expect(Geo::ProjectRegistry.where(project: project)).to be_empty
expect(Geo::DesignRegistry.where(project: project)).to be_empty
end
end
end
end
end
......@@ -8,7 +8,6 @@ RSpec.describe Geo::RepositoryShardSyncWorker, :geo, :geo_fdw, :clean_gitlab_red
let!(:primary) { create(:geo_node, :primary) }
let!(:secondary) { create(:geo_node) }
let(:shard_name) { Gitlab.config.repositories.storages.each_key.first }
before do
......@@ -17,7 +16,6 @@ RSpec.describe Geo::RepositoryShardSyncWorker, :geo, :geo_fdw, :clean_gitlab_red
describe '#perform' do
let!(:restricted_group) { create(:group) }
let!(:unsynced_project_in_restricted_group) { create(:project, group: restricted_group) }
let!(:unsynced_project) { create(:project) }
......@@ -27,21 +25,6 @@ RSpec.describe Geo::RepositoryShardSyncWorker, :geo, :geo_fdw, :clean_gitlab_red
Gitlab::ShardHealthCache.update([shard_name])
end
it 'performs Geo::ProjectSyncWorker for each project' do
expect(Geo::ProjectSyncWorker).to receive(:perform_async).twice.and_return(spy)
subject.perform(shard_name)
end
it 'performs Geo::ProjectSyncWorker for projects where last attempt to sync failed' do
create(:geo_project_registry, :sync_failed, project: unsynced_project_in_restricted_group)
create(:geo_project_registry, :synced, project: unsynced_project)
expect(Geo::ProjectSyncWorker).to receive(:perform_async).once.and_return(spy)
subject.perform(shard_name)
end
it 'does not perform Geo::ProjectSyncWorker when shard becomes unhealthy' do
Gitlab::ShardHealthCache.update([])
......@@ -50,28 +33,6 @@ RSpec.describe Geo::RepositoryShardSyncWorker, :geo, :geo_fdw, :clean_gitlab_red
subject.perform(shard_name)
end
it 'performs Geo::ProjectSyncWorker for synced projects updated recently' do
create(:geo_project_registry, :synced, :repository_dirty, project: unsynced_project_in_restricted_group)
create(:geo_project_registry, :synced, project: unsynced_project)
create(:geo_project_registry, :synced, :wiki_dirty)
expect(Geo::ProjectSyncWorker).to receive(:perform_async).twice.and_return(spy)
subject.perform(shard_name)
end
it 'does not schedule a job twice for the same project' do
scheduled_jobs = [
{ job_id: 1, project_id: unsynced_project.id },
{ job_id: 2, project_id: unsynced_project_in_restricted_group.id }
]
is_expected.to receive(:scheduled_jobs).and_return(scheduled_jobs).at_least(:once)
is_expected.not_to receive(:schedule_job)
Sidekiq::Testing.inline! { subject.perform(shard_name) }
end
it 'does not perform Geo::ProjectSyncWorker when no geo database is configured' do
allow(Gitlab::Geo).to receive(:geo_database_configured?) { false }
......@@ -100,210 +61,488 @@ RSpec.describe Geo::RepositoryShardSyncWorker, :geo, :geo_fdw, :clean_gitlab_red
subject.perform(shard_name)
end
context 'multiple shards' do
it 'uses two loops to schedule jobs', :sidekiq_might_not_need_inline do
expect(subject).to receive(:schedule_jobs).twice.and_call_original
Gitlab::ShardHealthCache.update([shard_name, 'shard2', 'shard3', 'shard4', 'shard5'])
secondary.update!(repos_max_capacity: 5)
context 'number of scheduled jobs exceeds capacity' do
it 'schedules 0 jobs' do
is_expected.to receive(:scheduled_job_ids).and_return(1..1000).at_least(:once)
is_expected.not_to receive(:schedule_job)
subject.perform(shard_name)
Sidekiq::Testing.inline! { subject.perform(shard_name) }
end
end
context 'when node has namespace restrictions', :request_store do
context 'when geo_project_registry_ssot_sync is enabled' do
before do
secondary.update!(selective_sync_type: 'namespaces', namespaces: [restricted_group])
stub_feature_flags(geo_project_registry_ssot_sync: true)
end
allow(::Gitlab::Geo).to receive(:current_node).and_call_original
Rails.cache.write(:current_node, secondary.to_json)
allow(::GeoNode).to receive(:current_node).and_return(secondary)
it 'performs Geo::ProjectSyncWorker for each registry' do
create(:geo_project_registry, project: unsynced_project)
expect(Geo::ProjectSyncWorker).to receive(:perform_async).once.and_return(spy)
subject.perform(shard_name)
end
it 'does not perform Geo::ProjectSyncWorker for projects that do not belong to selected namespaces to replicate' do
expect(Geo::ProjectSyncWorker).to receive(:perform_async)
.with(unsynced_project_in_restricted_group.id, sync_repository: true, sync_wiki: true)
.once
.and_return(spy)
it 'performs Geo::ProjectSyncWorker for projects where last attempt to sync failed' do
create(:geo_project_registry, :sync_failed, project: unsynced_project_in_restricted_group)
create(:geo_project_registry, :synced, project: unsynced_project)
expect(Geo::ProjectSyncWorker).to receive(:perform_async).once.and_return(spy)
subject.perform(shard_name)
end
it 'does not perform Geo::ProjectSyncWorker for synced projects updated recently that do not belong to selected namespaces to replicate' do
it 'performs Geo::ProjectSyncWorker for synced projects updated recently' do
create(:geo_project_registry, :synced, :repository_dirty, project: unsynced_project_in_restricted_group)
create(:geo_project_registry, :synced, :repository_dirty, project: unsynced_project)
create(:geo_project_registry, :synced, project: unsynced_project)
create(:geo_project_registry, :synced, :wiki_dirty)
expect(Geo::ProjectSyncWorker).to receive(:perform_async)
.with(unsynced_project_in_restricted_group.id, sync_repository: true, sync_wiki: false)
.once
.and_return(spy)
expect(Geo::ProjectSyncWorker).to receive(:perform_async).twice.and_return(spy)
subject.perform(shard_name)
end
end
context 'repositories that have never been updated' do
let!(:project_list) { create_list(:project, 4, last_repository_updated_at: 2.hours.ago) }
let!(:abandoned_project) { create(:project) }
it 'does not schedule a job twice for the same project' do
create(:geo_project_registry, project: unsynced_project)
create(:geo_project_registry, project: unsynced_project_in_restricted_group)
before do
# Project sync failed but never received an update
create(:geo_project_registry, :repository_sync_failed, project: abandoned_project)
abandoned_project.update_column(:last_repository_updated_at, 1.year.ago)
scheduled_jobs = [
{ job_id: 1, project_id: unsynced_project.id },
{ job_id: 2, project_id: unsynced_project_in_restricted_group.id }
]
# Neither of these are needed for this spec
unsynced_project.destroy
unsynced_project_in_restricted_group.destroy
is_expected.to receive(:scheduled_jobs).and_return(scheduled_jobs).at_least(:once)
is_expected.not_to receive(:schedule_job)
allow_next_instance_of(described_class) do |instance|
allow(instance).to receive(:db_retrieve_batch_size).and_return(2) # Must be >1 because of the Geo::BaseSchedulerWorker#interleave
end
secondary.update!(repos_max_capacity: 3) # Must be more than db_retrieve_batch_size
Sidekiq::Testing.inline! { subject.perform(shard_name) }
end
project_list.each do |project|
allow(Geo::ProjectSyncWorker)
.to receive(:perform_async)
.with(project.id, anything)
.and_call_original
context 'backoff time' do
let(:cache_key) { "#{described_class.name.underscore}:shard:#{shard_name}:skip" }
before do
allow(Rails.cache).to receive(:read).and_call_original
allow(Rails.cache).to receive(:write).and_call_original
end
allow_next_instance_of(Geo::ProjectRegistry) do |instance|
allow(instance).to receive(:wiki_sync_due?).and_return(false)
it 'sets the back off time when there are no pending items' do
create(:geo_project_registry, :synced, project: unsynced_project_in_restricted_group)
create(:geo_project_registry, :synced, project: unsynced_project)
expect(Rails.cache).to receive(:write).with(cache_key, true, expires_in: 300.seconds).once
subject.perform(shard_name)
end
allow_next_instance_of(Geo::RepositorySyncService) do |instance|
allow(instance).to receive(:expire_repository_caches)
it 'does not perform Geo::ProjectSyncWorker when the backoff time is set' do
expect(Rails.cache).to receive(:read).with(cache_key).and_return(true)
expect(Geo::ProjectSyncWorker).not_to receive(:perform_async)
subject.perform(shard_name)
end
end
it 'tries to sync project where last attempt to sync failed' do
expect(Geo::ProjectSyncWorker)
.to receive(:perform_async)
.with(abandoned_project.id, anything)
.at_least(:once)
.and_return(spy)
context 'repositories that have never been updated' do
let!(:project_list) { create_list(:project, 4, last_repository_updated_at: 2.hours.ago) }
let!(:abandoned_project) { create(:project) }
3.times do
Sidekiq::Testing.inline! { subject.perform(shard_name) }
before do
# Project sync failed but never received an update
create(:geo_project_registry, :repository_sync_failed, project: abandoned_project)
abandoned_project.update_column(:last_repository_updated_at, 1.year.ago)
# Neither of these are needed for this spec
unsynced_project.destroy
unsynced_project_in_restricted_group.destroy
allow_next_instance_of(described_class) do |instance|
allow(instance).to receive(:db_retrieve_batch_size).and_return(2) # Must be >1 because of the Geo::BaseSchedulerWorker#interleave
end
secondary.update!(repos_max_capacity: 3) # Must be more than db_retrieve_batch_size
project_list.each do |project|
create(:geo_project_registry, project: project)
allow(Geo::ProjectSyncWorker)
.to receive(:perform_async)
.with(project.id, anything)
.and_call_original
end
allow_next_instance_of(Geo::ProjectRegistry) do |instance|
allow(instance).to receive(:wiki_sync_due?).and_return(false)
end
allow_next_instance_of(Geo::RepositorySyncService) do |instance|
allow(instance).to receive(:expire_repository_caches)
end
end
it 'tries to sync project where last attempt to sync failed' do
expect(Geo::ProjectSyncWorker)
.to receive(:perform_async)
.with(abandoned_project.id, anything)
.at_least(:once)
.and_return(spy)
3.times do
Sidekiq::Testing.inline! { subject.perform(shard_name) }
end
end
end
end
context 'projects that require resync' do
context 'when project repository is dirty' do
it 'syncs repository only' do
create(:geo_project_registry, :synced, :repository_dirty, project: unsynced_project)
create(:geo_project_registry, :synced, :repository_dirty, project: unsynced_project_in_restricted_group)
context 'multiple shards' do
it 'uses two loops to schedule jobs', :sidekiq_might_not_need_inline do
create(:geo_project_registry, project: unsynced_project)
create(:geo_project_registry, project: unsynced_project_in_restricted_group)
Gitlab::ShardHealthCache.update([shard_name, 'shard2', 'shard3', 'shard4', 'shard5'])
secondary.update!(repos_max_capacity: 5)
expect(Geo::ProjectSyncWorker).to receive(:perform_async).with(unsynced_project.id, sync_repository: true, sync_wiki: false)
expect(Geo::ProjectSyncWorker).to receive(:perform_async).with(unsynced_project_in_restricted_group.id, sync_repository: true, sync_wiki: false)
expect(subject).to receive(:schedule_jobs).twice.and_call_original
subject.perform(shard_name)
end
end
context 'when project wiki is dirty' do
it 'syncs wiki only' do
create(:geo_project_registry, :synced, :wiki_dirty, project: unsynced_project)
create(:geo_project_registry, :synced, :wiki_dirty, project: unsynced_project_in_restricted_group)
context 'all repositories fail' do
let!(:project_list) { create_list(:project, 4, :random_last_repository_updated_at) }
expect(Geo::ProjectSyncWorker).to receive(:perform_async).with(unsynced_project.id, sync_repository: false, sync_wiki: true)
expect(Geo::ProjectSyncWorker).to receive(:perform_async).with(unsynced_project_in_restricted_group.id, sync_repository: false, sync_wiki: true)
before do
# Neither of these are needed for this spec
unsynced_project.destroy
unsynced_project_in_restricted_group.destroy
subject.perform(shard_name)
allow_next_instance_of(described_class) do |instance|
allow(instance).to receive(:db_retrieve_batch_size).and_return(2) # Must be >1 because of the Geo::BaseSchedulerWorker#interleave
end
secondary.update!(repos_max_capacity: 3) # Must be more than db_retrieve_batch_size
allow_next_instance_of(Project) do |instance|
allow(instance).to receive(:ensure_repository).and_raise(Gitlab::Shell::Error.new('foo'))
end
allow_next_instance_of(Geo::ProjectRegistry) do |instance|
allow(instance).to receive(:wiki_sync_due?).and_return(false)
end
allow_next_instance_of(Geo::RepositorySyncService) do |instance|
allow(instance).to receive(:expire_repository_caches)
end
allow_next_instance_of(Geo::ProjectHousekeepingService) do |instance|
allow(instance).to receive(:do_housekeeping)
end
end
end
end
context 'all repositories fail' do
let!(:project_list) { create_list(:project, 4, :random_last_repository_updated_at) }
it 'tries to sync every project' do
project_list.each do |project|
create(:geo_project_registry, project: project)
before do
# Neither of these are needed for this spec
unsynced_project.destroy
unsynced_project_in_restricted_group.destroy
expect(Geo::ProjectSyncWorker)
.to receive(:perform_async)
.with(project.id, anything)
.at_least(:once)
.and_call_original
end
allow_next_instance_of(described_class) do |instance|
allow(instance).to receive(:db_retrieve_batch_size).and_return(2) # Must be >1 because of the Geo::BaseSchedulerWorker#interleave
3.times do
Sidekiq::Testing.inline! { subject.perform(shard_name) }
end
end
secondary.update!(repos_max_capacity: 3) # Must be more than db_retrieve_batch_size
allow_next_instance_of(Project) do |instance|
allow(instance).to receive(:ensure_repository).and_raise(Gitlab::Shell::Error.new('foo'))
context 'projects that require resync' do
context 'when project repository is dirty' do
it 'does not sync repositories' do
create(:geo_project_registry, :synced, :repository_dirty, project: unsynced_project)
create(:geo_project_registry, :synced, :repository_dirty, project: unsynced_project_in_restricted_group)
expect(Geo::ProjectSyncWorker).not_to receive(:perform_async).with(unsynced_project.id, sync_repository: true, sync_wiki: false)
expect(Geo::ProjectSyncWorker).not_to receive(:perform_async).with(unsynced_project_in_restricted_group.id, sync_repository: true, sync_wiki: false)
subject.perform(shard_name)
end
end
context 'when project wiki is dirty' do
it 'does not syn wikis' do
create(:geo_project_registry, :synced, :wiki_dirty, project: unsynced_project)
create(:geo_project_registry, :synced, :wiki_dirty, project: unsynced_project_in_restricted_group)
expect(Geo::ProjectSyncWorker).not_to receive(:perform_async).with(unsynced_project.id, sync_repository: false, sync_wiki: true)
expect(Geo::ProjectSyncWorker).not_to receive(:perform_async).with(unsynced_project_in_restricted_group.id, sync_repository: false, sync_wiki: true)
subject.perform(shard_name)
end
end
end
allow_next_instance_of(Geo::ProjectRegistry) do |instance|
allow(instance).to receive(:wiki_sync_due?).and_return(false)
end
context 'additional shards' do
it 'skips backfill for projects on unhealthy shards' do
missing_not_synced = create(:project, group: restricted_group)
missing_not_synced.update_column(:repository_storage, 'unknown')
missing_dirty = create(:project, group: restricted_group)
missing_dirty.update_column(:repository_storage, 'unknown')
create(:geo_project_registry, project: unsynced_project_in_restricted_group)
create(:geo_project_registry, project: missing_dirty)
create(:geo_project_registry, project: missing_not_synced)
expect(Geo::ProjectSyncWorker).to receive(:perform_async).with(unsynced_project_in_restricted_group.id, anything)
expect(Geo::ProjectSyncWorker).not_to receive(:perform_async).with(missing_not_synced.id, anything)
expect(Geo::ProjectSyncWorker).not_to receive(:perform_async).with(missing_dirty.id, anything)
Sidekiq::Testing.inline! { subject.perform(shard_name) }
end
allow_next_instance_of(Geo::RepositorySyncService) do |instance|
allow(instance).to receive(:expire_repository_caches)
end
end
context 'when geo_project_registry_ssot_sync is disabled' do
before do
stub_feature_flags(geo_project_registry_ssot_sync: false)
end
it 'performs Geo::ProjectSyncWorker for each project' do
expect(Geo::ProjectSyncWorker).to receive(:perform_async).twice.and_return(spy)
subject.perform(shard_name)
end
it 'performs Geo::ProjectSyncWorker for projects where last attempt to sync failed' do
create(:geo_project_registry, :sync_failed, project: unsynced_project_in_restricted_group)
create(:geo_project_registry, :synced, project: unsynced_project)
expect(Geo::ProjectSyncWorker).to receive(:perform_async).once.and_return(spy)
subject.perform(shard_name)
end
it 'performs Geo::ProjectSyncWorker for synced projects updated recently' do
create(:geo_project_registry, :synced, :repository_dirty, project: unsynced_project_in_restricted_group)
create(:geo_project_registry, :synced, project: unsynced_project)
create(:geo_project_registry, :synced, :wiki_dirty)
expect(Geo::ProjectSyncWorker).to receive(:perform_async).twice.and_return(spy)
subject.perform(shard_name)
end
it 'does not schedule a job twice for the same project' do
scheduled_jobs = [
{ job_id: 1, project_id: unsynced_project.id },
{ job_id: 2, project_id: unsynced_project_in_restricted_group.id }
]
is_expected.to receive(:scheduled_jobs).and_return(scheduled_jobs).at_least(:once)
is_expected.not_to receive(:schedule_job)
Sidekiq::Testing.inline! { subject.perform(shard_name) }
end
context 'backoff time' do
let(:cache_key) { "#{described_class.name.underscore}:shard:#{shard_name}:skip" }
before do
allow(Rails.cache).to receive(:read).and_call_original
allow(Rails.cache).to receive(:write).and_call_original
end
it 'sets the back off time when there are no pending items' do
create(:geo_project_registry, :synced, project: unsynced_project_in_restricted_group)
create(:geo_project_registry, :synced, project: unsynced_project)
expect(Rails.cache).to receive(:write).with(cache_key, true, expires_in: 300.seconds).once
subject.perform(shard_name)
end
allow_next_instance_of(Geo::ProjectHousekeepingService) do |instance|
allow(instance).to receive(:do_housekeeping)
it 'does not perform Geo::ProjectSyncWorker when the backoff time is set' do
expect(Rails.cache).to receive(:read).with(cache_key).and_return(true)
expect(Geo::ProjectSyncWorker).not_to receive(:perform_async)
subject.perform(shard_name)
end
end
it 'tries to sync every project' do
project_list.each do |project|
context 'repositories that have never been updated' do
let!(:project_list) { create_list(:project, 4, last_repository_updated_at: 2.hours.ago) }
let!(:abandoned_project) { create(:project) }
before do
# Project sync failed but never received an update
create(:geo_project_registry, :repository_sync_failed, project: abandoned_project)
abandoned_project.update_column(:last_repository_updated_at, 1.year.ago)
# Neither of these are needed for this spec
unsynced_project.destroy
unsynced_project_in_restricted_group.destroy
allow_next_instance_of(described_class) do |instance|
allow(instance).to receive(:db_retrieve_batch_size).and_return(2) # Must be >1 because of the Geo::BaseSchedulerWorker#interleave
end
secondary.update!(repos_max_capacity: 3) # Must be more than db_retrieve_batch_size
project_list.each do |project|
allow(Geo::ProjectSyncWorker)
.to receive(:perform_async)
.with(project.id, anything)
.and_call_original
end
allow_next_instance_of(Geo::ProjectRegistry) do |instance|
allow(instance).to receive(:wiki_sync_due?).and_return(false)
end
allow_next_instance_of(Geo::RepositorySyncService) do |instance|
allow(instance).to receive(:expire_repository_caches)
end
end
it 'tries to sync project where last attempt to sync failed' do
expect(Geo::ProjectSyncWorker)
.to receive(:perform_async)
.with(project.id, anything)
.with(abandoned_project.id, anything)
.at_least(:once)
.and_call_original
.and_return(spy)
3.times do
Sidekiq::Testing.inline! { subject.perform(shard_name) }
end
end
end
3.times do
Sidekiq::Testing.inline! { subject.perform(shard_name) }
context 'multiple shards' do
it 'uses two loops to schedule jobs', :sidekiq_might_not_need_inline do
expect(subject).to receive(:schedule_jobs).twice.and_call_original
Gitlab::ShardHealthCache.update([shard_name, 'shard2', 'shard3', 'shard4', 'shard5'])
secondary.update!(repos_max_capacity: 5)
subject.perform(shard_name)
end
end
end
context 'additional shards' do
it 'skips backfill for projects on unhealthy shards' do
missing_not_synced = create(:project, group: restricted_group)
missing_not_synced.update_column(:repository_storage, 'unknown')
missing_dirty = create(:project, group: restricted_group)
missing_dirty.update_column(:repository_storage, 'unknown')
context 'when node has namespace restrictions', :request_store do
before do
secondary.update!(selective_sync_type: 'namespaces', namespaces: [restricted_group])
create(:geo_project_registry, :synced, :repository_dirty, project: missing_dirty)
allow(::Gitlab::Geo).to receive(:current_node).and_call_original
Rails.cache.write(:current_node, secondary.to_json)
allow(::GeoNode).to receive(:current_node).and_return(secondary)
end
expect(Geo::ProjectSyncWorker).to receive(:perform_async).with(unsynced_project_in_restricted_group.id, anything)
expect(Geo::ProjectSyncWorker).not_to receive(:perform_async).with(missing_not_synced.id, anything)
expect(Geo::ProjectSyncWorker).not_to receive(:perform_async).with(missing_dirty.id, anything)
it 'does not perform Geo::ProjectSyncWorker for projects that do not belong to selected namespaces to replicate' do
expect(Geo::ProjectSyncWorker).to receive(:perform_async)
.with(unsynced_project_in_restricted_group.id, sync_repository: true, sync_wiki: true)
.once
.and_return(spy)
Sidekiq::Testing.inline! { subject.perform(shard_name) }
end
end
subject.perform(shard_name)
end
context 'number of scheduled jobs exceeds capacity' do
it 'schedules 0 jobs' do
is_expected.to receive(:scheduled_job_ids).and_return(1..1000).at_least(:once)
is_expected.not_to receive(:schedule_job)
it 'does not perform Geo::ProjectSyncWorker for synced projects updated recently that do not belong to selected namespaces to replicate' do
create(:geo_project_registry, :synced, :repository_dirty, project: unsynced_project_in_restricted_group)
create(:geo_project_registry, :synced, :repository_dirty, project: unsynced_project)
Sidekiq::Testing.inline! { subject.perform(shard_name) }
expect(Geo::ProjectSyncWorker).to receive(:perform_async)
.with(unsynced_project_in_restricted_group.id, sync_repository: true, sync_wiki: false)
.once
.and_return(spy)
subject.perform(shard_name)
end
end
end
context 'backoff time' do
let(:cache_key) { "#{described_class.name.underscore}:shard:#{shard_name}:skip" }
context 'all repositories fail' do
let!(:project_list) { create_list(:project, 4, :random_last_repository_updated_at) }
before do
allow(Rails.cache).to receive(:read).and_call_original
allow(Rails.cache).to receive(:write).and_call_original
before do
# Neither of these are needed for this spec
unsynced_project.destroy
unsynced_project_in_restricted_group.destroy
allow_next_instance_of(described_class) do |instance|
allow(instance).to receive(:db_retrieve_batch_size).and_return(2) # Must be >1 because of the Geo::BaseSchedulerWorker#interleave
end
secondary.update!(repos_max_capacity: 3) # Must be more than db_retrieve_batch_size
allow_next_instance_of(Project) do |instance|
allow(instance).to receive(:ensure_repository).and_raise(Gitlab::Shell::Error.new('foo'))
end
allow_next_instance_of(Geo::ProjectRegistry) do |instance|
allow(instance).to receive(:wiki_sync_due?).and_return(false)
end
allow_next_instance_of(Geo::RepositorySyncService) do |instance|
allow(instance).to receive(:expire_repository_caches)
end
allow_next_instance_of(Geo::ProjectHousekeepingService) do |instance|
allow(instance).to receive(:do_housekeeping)
end
end
it 'tries to sync every project' do
project_list.each do |project|
expect(Geo::ProjectSyncWorker)
.to receive(:perform_async)
.with(project.id, anything)
.at_least(:once)
.and_call_original
end
3.times do
Sidekiq::Testing.inline! { subject.perform(shard_name) }
end
end
end
it 'sets the back off time when there are no pending items' do
create(:geo_project_registry, :synced, project: unsynced_project_in_restricted_group)
create(:geo_project_registry, :synced, project: unsynced_project)
context 'additional shards' do
it 'skips backfill for projects on unhealthy shards' do
missing_not_synced = create(:project, group: restricted_group)
missing_not_synced.update_column(:repository_storage, 'unknown')
missing_dirty = create(:project, group: restricted_group)
missing_dirty.update_column(:repository_storage, 'unknown')
expect(Rails.cache).to receive(:write).with(cache_key, true, expires_in: 300.seconds).once
create(:geo_project_registry, :synced, :repository_dirty, project: missing_dirty)
subject.perform(shard_name)
expect(Geo::ProjectSyncWorker).to receive(:perform_async).with(unsynced_project_in_restricted_group.id, anything)
expect(Geo::ProjectSyncWorker).not_to receive(:perform_async).with(missing_not_synced.id, anything)
expect(Geo::ProjectSyncWorker).not_to receive(:perform_async).with(missing_dirty.id, anything)
Sidekiq::Testing.inline! { subject.perform(shard_name) }
end
end
it 'does not perform Geo::ProjectSyncWorker when the backoff time is set' do
expect(Rails.cache).to receive(:read).with(cache_key).and_return(true)
context 'projects that require resync' do
context 'when project repository is dirty' do
it 'syncs repository only' do
create(:geo_project_registry, :synced, :repository_dirty, project: unsynced_project)
create(:geo_project_registry, :synced, :repository_dirty, project: unsynced_project_in_restricted_group)
expect(Geo::ProjectSyncWorker).not_to receive(:perform_async)
expect(Geo::ProjectSyncWorker).to receive(:perform_async).with(unsynced_project.id, sync_repository: true, sync_wiki: false)
expect(Geo::ProjectSyncWorker).to receive(:perform_async).with(unsynced_project_in_restricted_group.id, sync_repository: true, sync_wiki: false)
subject.perform(shard_name)
subject.perform(shard_name)
end
end
context 'when project wiki is dirty' do
it 'syncs wiki only' do
create(:geo_project_registry, :synced, :wiki_dirty, project: unsynced_project)
create(:geo_project_registry, :synced, :wiki_dirty, project: unsynced_project_in_restricted_group)
expect(Geo::ProjectSyncWorker).to receive(:perform_async).with(unsynced_project.id, sync_repository: false, sync_wiki: true)
expect(Geo::ProjectSyncWorker).to receive(:perform_async).with(unsynced_project_in_restricted_group.id, sync_repository: false, sync_wiki: true)
subject.perform(shard_name)
end
end
end
end
end
......
......@@ -76,13 +76,15 @@ RSpec.describe Geo::Secondary::RegistryConsistencyWorker, :geo, :geo_fdw do
# Somewhat of an integration test
it 'creates missing registries for each registry class' do
lfs_object = create(:lfs_object)
job_artifact = create(:ci_job_artifact)
lfs_object = create(:lfs_object)
project = create(:project)
upload = create(:upload)
package_file = create(:conan_package_file, :conan_package)
expect(Geo::LfsObjectRegistry.where(lfs_object_id: lfs_object.id).count).to eq(0)
expect(Geo::JobArtifactRegistry.where(artifact_id: job_artifact.id).count).to eq(0)
expect(Geo::ProjectRegistry.where(project_id: project.id).count).to eq(0)
expect(Geo::UploadRegistry.where(file_id: upload.id).count).to eq(0)
expect(Geo::PackageFileRegistry.where(package_file_id: package_file.id).count).to eq(0)
......@@ -90,13 +92,12 @@ RSpec.describe Geo::Secondary::RegistryConsistencyWorker, :geo, :geo_fdw do
expect(Geo::LfsObjectRegistry.where(lfs_object_id: lfs_object.id).count).to eq(1)
expect(Geo::JobArtifactRegistry.where(artifact_id: job_artifact.id).count).to eq(1)
expect(Geo::ProjectRegistry.where(project_id: project.id).count).to eq(1)
expect(Geo::UploadRegistry.where(file_id: upload.id).count).to eq(1)
expect(Geo::PackageFileRegistry.where(package_file_id: package_file.id).count).to eq(1)
end
context 'when geo_job_artifact_registry_ssot_sync is disabled' do
let_it_be(:job_artifact) { create(:ci_job_artifact) }
before do
stub_feature_flags(geo_job_artifact_registry_ssot_sync: false)
end
......@@ -107,6 +108,7 @@ RSpec.describe Geo::Secondary::RegistryConsistencyWorker, :geo, :geo_fdw do
it 'does not execute RegistryConsistencyService for Job Artifacts' do
allow(Geo::RegistryConsistencyService).to receive(:new).with(Geo::LfsObjectRegistry, batch_size: 1000).and_call_original
allow(Geo::RegistryConsistencyService).to receive(:new).with(Geo::ProjectRegistry, batch_size: 1000).and_call_original
allow(Geo::RegistryConsistencyService).to receive(:new).with(Geo::UploadRegistry, batch_size: 1000).and_call_original
allow(Geo::RegistryConsistencyService).to receive(:new).with(Geo::PackageFileRegistry, batch_size: 1000).and_call_original
......@@ -117,8 +119,6 @@ RSpec.describe Geo::Secondary::RegistryConsistencyWorker, :geo, :geo_fdw do
end
context 'when geo_file_registry_ssot_sync is disabled' do
let_it_be(:upload) { create(:upload) }
before do
stub_feature_flags(geo_file_registry_ssot_sync: false)
end
......@@ -131,6 +131,7 @@ RSpec.describe Geo::Secondary::RegistryConsistencyWorker, :geo, :geo_fdw do
allow(Geo::RegistryConsistencyService).to receive(:new).with(Geo::JobArtifactRegistry, batch_size: 1000).and_call_original
allow(Geo::RegistryConsistencyService).to receive(:new).with(Geo::LfsObjectRegistry, batch_size: 1000).and_call_original
allow(Geo::RegistryConsistencyService).to receive(:new).with(Geo::PackageFileRegistry, batch_size: 1000).and_call_original
allow(Geo::RegistryConsistencyService).to receive(:new).with(Geo::ProjectRegistry, batch_size: 1000).and_call_original
expect(Geo::RegistryConsistencyService).not_to receive(:new).with(Geo::UploadRegistry, batch_size: 1000)
......@@ -138,6 +139,27 @@ RSpec.describe Geo::Secondary::RegistryConsistencyWorker, :geo, :geo_fdw do
end
end
context 'when geo_project_registry_ssot_sync is disabled' do
before do
stub_feature_flags(geo_project_registry_ssot_sync: false)
end
it 'returns false' do
expect(subject.perform).to be_falsey
end
it 'does not execute RegistryConsistencyService for projects' do
allow(Geo::RegistryConsistencyService).to receive(:new).with(Geo::JobArtifactRegistry, batch_size: 1000).and_call_original
allow(Geo::RegistryConsistencyService).to receive(:new).with(Geo::LfsObjectRegistry, batch_size: 1000).and_call_original
allow(Geo::RegistryConsistencyService).to receive(:new).with(Geo::PackageFileRegistry, batch_size: 1000).and_call_original
allow(Geo::RegistryConsistencyService).to receive(:new).with(Geo::UploadRegistry, batch_size: 1000).and_call_original
expect(Geo::RegistryConsistencyService).not_to receive(:new).with(Geo::ProjectRegistry, batch_size: 1000)
subject.perform
end
end
context 'when the current Geo node is disabled or primary' do
before do
stub_primary_node
......
......@@ -2,16 +2,31 @@
require 'spec_helper'
RSpec.describe GeoRepositoryDestroyWorker do
RSpec.describe GeoRepositoryDestroyWorker, :geo do
describe '#perform' do
it 'delegates project removal to Geo::RepositoryDestroyService' do
project = create(:project)
let(:project) { create(:project) }
expect_next_instance_of(Geo::RepositoryDestroyService) do |instance|
expect(instance).to receive(:execute)
context 'with an existing project' do
it 'delegates project removal to Geo::RepositoryDestroyService' do
expect_next_instance_of(Geo::RepositoryDestroyService) do |instance|
expect(instance).to receive(:execute)
end
subject.perform(project.id, project.name, project.path, 'default')
end
end
context 'with project ID from an orphaned registry' do
it 'delegates project removal to Geo::RepositoryDestroyService' do
registry = create(:geo_project_registry, project_id: project.id)
project.delete
described_class.new.perform(project.id, project.name, project.path, 'default')
expect_next_instance_of(Geo::RepositoryDestroyService) do |instance|
expect(instance).to receive(:execute)
end
subject.perform(registry.project_id)
end
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment