Commit b7f6169e authored by Mayra Cabrera's avatar Mayra Cabrera

Merge branch '212351-registry-tables-as-ssot-for-projects' into 'master'

Geo - Make Registry tables as SSOT to sync projects and wikis

See merge request gitlab-org/gitlab!34342
parents 534cd66b bf7fffdb
# frozen_string_literal: true
module Geo
class ProjectRegistryFinder
# Returns ProjectRegistry records that have never been synced.
#
# Does not care about selective sync, because it considers the Registry
# table to be the single source of truth. The contract is that other
# processes need to ensure that the table only contains records that should
# be synced.
#
# Any registries that have ever been synced that currently need to be
# resynced will be handled by other find methods (like
# #find_retryable_dirty_registries)
#
# You can pass a list with `except_ids:` so you can exclude items you
# already scheduled but haven't finished and aren't persisted to the database yet
#
# @param [Integer] batch_size used to limit the results returned
# @param [Array<Integer>] except_ids ids that will be ignored from the query
# rubocop:disable CodeReuse/ActiveRecord
def find_never_synced_registries(batch_size:, except_ids: [])
Geo::ProjectRegistry
.never_synced
.model_id_not_in(except_ids)
.limit(batch_size)
end
# rubocop:enable CodeReuse/ActiveRecord
# rubocop:disable CodeReuse/ActiveRecord
def find_retryable_dirty_registries(batch_size:, except_ids: [])
Geo::ProjectRegistry
.dirty
.retry_due
.model_id_not_in(except_ids)
.order(Gitlab::Database.nulls_first_order(:last_repository_synced_at))
.limit(batch_size)
end
# rubocop:enable CodeReuse/ActiveRecord
end
end
...@@ -11,6 +11,10 @@ class Geo::BaseRegistry < Geo::TrackingBase ...@@ -11,6 +11,10 @@ class Geo::BaseRegistry < Geo::TrackingBase
where(self::MODEL_FOREIGN_KEY => range).pluck(self::MODEL_FOREIGN_KEY) where(self::MODEL_FOREIGN_KEY => range).pluck(self::MODEL_FOREIGN_KEY)
end end
def self.pluck_model_foreign_key
where(nil).pluck(self::MODEL_FOREIGN_KEY)
end
def self.model_id_in(ids) def self.model_id_in(ids)
where(self::MODEL_FOREIGN_KEY => ids) where(self::MODEL_FOREIGN_KEY => ids)
end end
......
# frozen_string_literal: true # frozen_string_literal: true
class Geo::DeletedProject class Geo::DeletedProject
attr_reader :id, :name, :disk_path include ActiveModel::Validations
attr_accessor :id, :name, :disk_path
validates :id, :name, :disk_path, presence: true
def initialize(id:, name:, disk_path:, repository_storage:) def initialize(id:, name:, disk_path:, repository_storage:)
@id = id @id = id
......
...@@ -3,6 +3,9 @@ ...@@ -3,6 +3,9 @@
class Geo::DesignRegistry < Geo::BaseRegistry class Geo::DesignRegistry < Geo::BaseRegistry
include ::Delay include ::Delay
MODEL_CLASS = ::Project
MODEL_FOREIGN_KEY = :project_id
RETRIES_BEFORE_REDOWNLOAD = 5 RETRIES_BEFORE_REDOWNLOAD = 5
belongs_to :project belongs_to :project
......
...@@ -5,6 +5,9 @@ class Geo::ProjectRegistry < Geo::BaseRegistry ...@@ -5,6 +5,9 @@ class Geo::ProjectRegistry < Geo::BaseRegistry
include ::EachBatch include ::EachBatch
include ::ShaAttribute include ::ShaAttribute
MODEL_CLASS = ::Project
MODEL_FOREIGN_KEY = :project_id
REGISTRY_TYPES = %i{repository wiki}.freeze REGISTRY_TYPES = %i{repository wiki}.freeze
RETRIES_BEFORE_REDOWNLOAD = 5 RETRIES_BEFORE_REDOWNLOAD = 5
...@@ -39,6 +42,34 @@ class Geo::ProjectRegistry < Geo::BaseRegistry ...@@ -39,6 +42,34 @@ class Geo::ProjectRegistry < Geo::BaseRegistry
where(nil).pluck(:project_id) where(nil).pluck(:project_id)
end end
def self.registry_consistency_worker_enabled?
Feature.enabled?(:geo_project_registry_ssot_sync)
end
def self.has_create_events?
true
end
def self.find_registry_differences(range)
source_ids = Gitlab::Geo.current_node.projects.id_in(range).pluck_primary_key
tracked_ids = self.pluck_model_ids_in_range(range)
untracked_ids = source_ids - tracked_ids
unused_tracked_ids = tracked_ids - source_ids
[untracked_ids, unused_tracked_ids]
end
def self.delete_worker_class
::GeoRepositoryDestroyWorker
end
def self.delete_for_model_ids(project_ids)
project_ids.map do |project_id|
delete_worker_class.perform_async(project_id)
end
end
def self.failed def self.failed
repository_sync_failed = arel_table[:repository_retry_count].gt(0) repository_sync_failed = arel_table[:repository_retry_count].gt(0)
wiki_sync_failed = arel_table[:wiki_retry_count].gt(0) wiki_sync_failed = arel_table[:wiki_retry_count].gt(0)
......
...@@ -3,10 +3,14 @@ ...@@ -3,10 +3,14 @@
module Geo module Geo
class RepositoryDestroyService class RepositoryDestroyService
include ::Gitlab::Geo::LogHelpers include ::Gitlab::Geo::LogHelpers
include ::Gitlab::Utils::StrongMemoize
attr_reader :id, :name, :disk_path, :repository_storage attr_reader :id, :name, :disk_path, :repository_storage
def initialize(id, name, disk_path, repository_storage) # There is a possibility that the replicable's record does not exist
# anymore. In this case, you need to pass the optional parameters
# explicitly.
def initialize(id, name = nil, disk_path = nil, repository_storage = nil)
@id = id @id = id
@name = name @name = name
@disk_path = disk_path @disk_path = disk_path
...@@ -29,25 +33,36 @@ module Geo ...@@ -29,25 +33,36 @@ module Geo
private private
def destroy_project def destroy_project
::Projects::DestroyService.new(deleted_project, nil).geo_replicate # We should skip if we had to rebuild the project, but we don't
# have the information that our service class requires.
return if project.is_a?(Geo::DeletedProject) && !project.valid?
::Projects::DestroyService.new(project, nil).geo_replicate
end end
# rubocop: disable CodeReuse/ActiveRecord
def destroy_registry_entries def destroy_registry_entries
::Geo::ProjectRegistry.where(project_id: id).delete_all ::Geo::ProjectRegistry.model_id_in(id).delete_all
::Geo::DesignRegistry.where(project_id: id).delete_all ::Geo::DesignRegistry.model_id_in(id).delete_all
log_info("Registry entries removed", project_id: id) log_info('Registry entries removed', project_id: id)
end end
# rubocop: enable CodeReuse/ActiveRecord
def project
def deleted_project strong_memoize(:project) do
# We don't have access to the original model anymore, so we are Project.find(id)
# rebuilding only what our service class requires rescue ActiveRecord::RecordNotFound => e
::Geo::DeletedProject.new(id: id, # When cleaning up project/registries, there are some cases where
name: name, # the replicable record does not exist anymore. So, we try to
disk_path: disk_path, # rebuild it with only what our service class requires.
repository_storage: repository_storage) log_error('Could not find project', e.message)
::Geo::DeletedProject.new(
id: id,
name: name,
disk_path: disk_path,
repository_storage: repository_storage
)
end
end end
end end
end end
...@@ -10,15 +10,15 @@ module Geo ...@@ -10,15 +10,15 @@ module Geo
{ project_id: project_id, job_id: job_id } if job_id { project_id: project_id, job_id: job_id } if job_id
end end
def find_project_ids_not_synced(batch_size:) def find_project_ids_not_synced(except_ids:, batch_size:)
Geo::DesignUnsyncedFinder Geo::DesignUnsyncedFinder
.new(scheduled_project_ids: scheduled_project_ids, shard_name: shard_name, batch_size: batch_size) .new(scheduled_project_ids: except_ids, shard_name: shard_name, batch_size: batch_size)
.execute .execute
end end
def find_project_ids_updated_recently(batch_size:) def find_project_ids_updated_recently(except_ids:, batch_size:)
Geo::DesignUpdatedRecentlyFinder Geo::DesignUpdatedRecentlyFinder
.new(scheduled_project_ids: scheduled_project_ids, shard_name: shard_name, batch_size: batch_size) .new(scheduled_project_ids: except_ids, shard_name: shard_name, batch_size: batch_size)
.execute .execute
end end
end end
......
...@@ -62,22 +62,31 @@ module Geo ...@@ -62,22 +62,31 @@ module Geo
end end
def load_pending_resources def load_pending_resources
resources = find_project_ids_not_synced(batch_size: db_retrieve_batch_size) return [] unless valid_shard?
resources = find_project_ids_not_synced(except_ids: scheduled_project_ids, batch_size: db_retrieve_batch_size)
remaining_capacity = db_retrieve_batch_size - resources.size remaining_capacity = db_retrieve_batch_size - resources.size
if remaining_capacity.zero? if remaining_capacity.zero?
resources resources
else else
resources + find_project_ids_updated_recently(batch_size: remaining_capacity) resources + find_project_ids_updated_recently(except_ids: scheduled_project_ids + resources, batch_size: remaining_capacity)
end end
end end
# rubocop: disable CodeReuse/ActiveRecord # rubocop: disable CodeReuse/ActiveRecord
def find_project_ids_not_synced(batch_size:) def find_project_ids_not_synced(except_ids:, batch_size:)
find_unsynced_projects(batch_size: batch_size) if Geo::ProjectRegistry.registry_consistency_worker_enabled?
.id_not_in(scheduled_project_ids) project_ids =
.reorder(last_repository_updated_at: :desc) find_never_synced_project_ids(batch_size: batch_size, except_ids: except_ids)
.pluck_primary_key
find_project_ids_within_shard(project_ids, direction: :desc)
else
find_unsynced_projects(batch_size: batch_size)
.id_not_in(except_ids)
.reorder(last_repository_updated_at: :desc)
.pluck_primary_key
end
end end
# rubocop: enable CodeReuse/ActiveRecord # rubocop: enable CodeReuse/ActiveRecord
...@@ -88,11 +97,18 @@ module Geo ...@@ -88,11 +97,18 @@ module Geo
end end
# rubocop: disable CodeReuse/ActiveRecord # rubocop: disable CodeReuse/ActiveRecord
def find_project_ids_updated_recently(batch_size:) def find_project_ids_updated_recently(except_ids:, batch_size:)
find_projects_updated_recently(batch_size: batch_size) if Geo::ProjectRegistry.registry_consistency_worker_enabled?
.id_not_in(scheduled_project_ids) project_ids =
.order('project_registry.last_repository_synced_at ASC NULLS FIRST, projects.last_repository_updated_at ASC') find_retryable_dirty_project_ids(batch_size: batch_size, except_ids: except_ids)
.pluck_primary_key
find_project_ids_within_shard(project_ids, direction: :asc)
else
find_projects_updated_recently(batch_size: batch_size)
.id_not_in(except_ids)
.order('project_registry.last_repository_synced_at ASC NULLS FIRST, projects.last_repository_updated_at ASC')
.pluck_primary_key
end
end end
# rubocop: enable CodeReuse/ActiveRecord # rubocop: enable CodeReuse/ActiveRecord
...@@ -101,5 +117,37 @@ module Geo ...@@ -101,5 +117,37 @@ module Geo
.new(current_node: current_node, shard_name: shard_name, batch_size: batch_size) .new(current_node: current_node, shard_name: shard_name, batch_size: batch_size)
.execute .execute
end end
def valid_shard?
return true unless current_node.selective_sync_by_shards?
current_node.selective_sync_shards.include?(shard_name)
end
def find_never_synced_project_ids(batch_size:, except_ids:)
registry_finder
.find_never_synced_registries(batch_size: batch_size, except_ids: except_ids)
.pluck_model_foreign_key
end
def find_retryable_dirty_project_ids(batch_size:, except_ids:)
registry_finder
.find_retryable_dirty_registries(batch_size: batch_size, except_ids: except_ids)
.pluck_model_foreign_key
end
# rubocop:disable CodeReuse/ActiveRecord
def find_project_ids_within_shard(project_ids, direction:)
Project
.id_in(project_ids)
.within_shards(shard_name)
.reorder(last_repository_updated_at: direction)
.pluck_primary_key
end
# rubocop:enable CodeReuse/ActiveRecord
def registry_finder
@registry_finder ||= Geo::ProjectRegistryFinder.new
end
end end
end end
...@@ -18,8 +18,9 @@ module Geo ...@@ -18,8 +18,9 @@ module Geo
REGISTRY_CLASSES = [ REGISTRY_CLASSES = [
Geo::JobArtifactRegistry, Geo::JobArtifactRegistry,
Geo::LfsObjectRegistry, Geo::LfsObjectRegistry,
Geo::UploadRegistry, Geo::PackageFileRegistry,
Geo::PackageFileRegistry Geo::ProjectRegistry,
Geo::UploadRegistry
].freeze ].freeze
BATCH_SIZE = 1000 BATCH_SIZE = 1000
......
...@@ -3,10 +3,13 @@ ...@@ -3,10 +3,13 @@
class GeoRepositoryDestroyWorker # rubocop:disable Scalability/IdempotentWorker class GeoRepositoryDestroyWorker # rubocop:disable Scalability/IdempotentWorker
include ApplicationWorker include ApplicationWorker
include GeoQueue include GeoQueue
include ::Gitlab::Geo::LogHelpers
loggable_arguments 1, 2, 3 loggable_arguments 1, 2, 3
def perform(id, name, disk_path, storage_name) def perform(id, name = nil, disk_path = nil, storage_name = nil)
log_info('Executing Geo::RepositoryDestroyService', id: id, name: name, disk_path: disk_path, storage_name: storage_name)
Geo::RepositoryDestroyService.new(id, name, disk_path, storage_name).execute Geo::RepositoryDestroyService.new(id, name, disk_path, storage_name).execute
end end
end end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Geo::ProjectRegistryFinder, :geo do
let_it_be(:project_1) { create(:project) }
let_it_be(:project_2) { create(:project) }
let_it_be(:project_3) { create(:project) }
let_it_be(:project_4) { create(:project) }
let_it_be(:project_5) { create(:project) }
let_it_be(:project_6) { create(:project) }
let_it_be(:registry_project_1) { create(:geo_project_registry, :synced, project_id: project_1.id) }
let_it_be(:registry_project_2) { create(:geo_project_registry, :sync_failed, project_id: project_2.id) }
let_it_be(:registry_project_3) { create(:geo_project_registry, project_id: project_3.id) }
let_it_be(:registry_project_4) { create(:geo_project_registry, :repository_dirty, project_id: project_4.id, last_repository_synced_at: 2.days.ago) }
let_it_be(:registry_project_5) { create(:geo_project_registry, :wiki_dirty, project_id: project_5.id, last_repository_synced_at: 5.days.ago) }
let_it_be(:registry_project_6) { create(:geo_project_registry, project_id: project_6.id) }
describe '#find_never_synced_registries' do
it 'returns registries for projects that have never been synced' do
registries = subject.find_never_synced_registries(batch_size: 10)
expect(registries).to match_ids(registry_project_3, registry_project_6)
end
it 'excludes except_ids' do
registries = subject.find_never_synced_registries(batch_size: 10, except_ids: [project_3.id])
expect(registries).to match_ids(registry_project_6)
end
end
describe '#find_retryable_dirty_registries' do
it 'returns registries for projects that have been recently updated or that have never been synced' do
registries = subject.find_retryable_dirty_registries(batch_size: 10)
expect(registries).to match_ids(registry_project_2, registry_project_3, registry_project_4, registry_project_5, registry_project_6)
end
it 'excludes except_ids' do
registries = subject.find_retryable_dirty_registries(batch_size: 10, except_ids: [project_4.id, project_5.id, project_6.id])
expect(registries).to match_ids(registry_project_2, registry_project_3)
end
end
end
...@@ -2,9 +2,11 @@ ...@@ -2,9 +2,11 @@
require 'spec_helper' require 'spec_helper'
RSpec.describe Geo::DeletedProject, type: :model do RSpec.describe Geo::DeletedProject, :geo, type: :model do
include StubConfiguration include StubConfiguration
subject { described_class.new(id: 1, name: 'sample', disk_path: 'root/sample', repository_storage: 'foo') }
before do before do
storages = { storages = {
'foo' => { 'path' => 'tmp/tests/storage_foo' }, 'foo' => { 'path' => 'tmp/tests/storage_foo' },
...@@ -14,11 +16,23 @@ RSpec.describe Geo::DeletedProject, type: :model do ...@@ -14,11 +16,23 @@ RSpec.describe Geo::DeletedProject, type: :model do
stub_storage_settings(storages) stub_storage_settings(storages)
end end
subject { described_class.new(id: 1, name: 'sample', disk_path: 'root/sample', repository_storage: 'foo') } describe 'attributes' do
it { is_expected.to respond_to(:id) }
it { is_expected.to respond_to(:name) }
it { is_expected.to respond_to(:disk_path) }
end
it { is_expected.to respond_to(:id) } describe 'validations' do
it { is_expected.to respond_to(:name) } it { is_expected.to validate_presence_of(:id) }
it { is_expected.to respond_to(:disk_path) } it { is_expected.to validate_presence_of(:name) }
it { is_expected.to validate_presence_of(:disk_path) }
end
describe 'attributes' do
it { is_expected.to respond_to(:id) }
it { is_expected.to respond_to(:name) }
it { is_expected.to respond_to(:disk_path) }
end
describe '#full_path' do describe '#full_path' do
it 'is an alias for disk_path' do it 'is an alias for disk_path' do
......
...@@ -25,6 +25,154 @@ RSpec.describe Geo::ProjectRegistry, :geo_fdw do ...@@ -25,6 +25,154 @@ RSpec.describe Geo::ProjectRegistry, :geo_fdw do
it { is_expected.to validate_uniqueness_of(:project) } it { is_expected.to validate_uniqueness_of(:project) }
end end
describe '.find_registry_differences' do
let!(:secondary) { create(:geo_node) }
let!(:synced_group) { create(:group) }
let!(:nested_group) { create(:group, parent: synced_group) }
let!(:project_1) { create(:project, group: synced_group) }
let!(:project_2) { create(:project, group: nested_group) }
let!(:project_3) { create(:project) }
let!(:project_4) { create(:project) }
let!(:project_5) { create(:project, :broken_storage) }
let!(:project_6) { create(:project, :broken_storage) }
before do
stub_current_geo_node(secondary)
end
context 'untracked IDs' do
before do
create(:geo_project_registry, project_id: project_1.id)
create(:geo_project_registry, :sync_failed, project_id: project_3.id)
create(:geo_project_registry, project_id: project_5.id)
end
it 'includes project IDs without an entry on the tracking database' do
range = Project.minimum(:id)..Project.maximum(:id)
untracked_ids, _ = described_class.find_registry_differences(range)
expect(untracked_ids).to match_array([project_2.id, project_4.id, project_6.id])
end
it 'excludes projects outside the ID range' do
untracked_ids, _ = described_class.find_registry_differences(project_4.id..project_6.id)
expect(untracked_ids).to match_array([project_4.id, project_6.id])
end
context 'with selective sync by namespace' do
let(:secondary) { create(:geo_node, selective_sync_type: 'namespaces', namespaces: [synced_group]) }
it 'excludes project IDs that are not in selectively synced projects' do
range = Project.minimum(:id)..Project.maximum(:id)
untracked_ids, _ = described_class.find_registry_differences(range)
expect(untracked_ids).to match_array([project_2.id])
end
end
context 'with selective sync by shard' do
let(:secondary) { create(:geo_node, selective_sync_type: 'shards', selective_sync_shards: ['broken']) }
it 'excludes project IDs that are not in selectively synced projects' do
range = Project.minimum(:id)..Project.maximum(:id)
untracked_ids, _ = described_class.find_registry_differences(range)
expect(untracked_ids).to match_array([project_6.id])
end
end
end
context 'unused tracked IDs' do
context 'with an orphaned registry' do
let!(:orphaned) { create(:geo_project_registry, project_id: project_1.id) }
before do
project_1.delete
end
it 'includes tracked IDs that do not exist in the model table' do
range = project_1.id..project_1.id
_, unused_tracked_ids = described_class.find_registry_differences(range)
expect(unused_tracked_ids).to match_array([project_1.id])
end
it 'excludes IDs outside the ID range' do
range = (project_1.id + 1)..Project.maximum(:id)
_, unused_tracked_ids = described_class.find_registry_differences(range)
expect(unused_tracked_ids).to be_empty
end
end
context 'with selective sync by namespace' do
let(:secondary) { create(:geo_node, selective_sync_type: 'namespaces', namespaces: [synced_group]) }
context 'with a tracked project' do
context 'excluded from selective sync' do
let!(:registry_entry) { create(:geo_project_registry, project_id: project_3.id) }
it 'includes tracked project IDs that exist but are not in a selectively synced project' do
range = project_3.id..project_3.id
_, unused_tracked_ids = described_class.find_registry_differences(range)
expect(unused_tracked_ids).to match_array([project_3.id])
end
end
context 'included in selective sync' do
let!(:registry_entry) { create(:geo_project_registry, project_id: project_1.id) }
it 'excludes tracked project IDs that are in selectively synced projects' do
range = project_1.id..project_1.id
_, unused_tracked_ids = described_class.find_registry_differences(range)
expect(unused_tracked_ids).to be_empty
end
end
end
end
context 'with selective sync by shard' do
let(:secondary) { create(:geo_node, selective_sync_type: 'shards', selective_sync_shards: ['broken']) }
context 'with a tracked project' do
let!(:registry_entry) { create(:geo_project_registry, project_id: project_1.id) }
context 'excluded from selective sync' do
it 'includes tracked project IDs that exist but are not in a selectively synced project' do
range = project_1.id..project_1.id
_, unused_tracked_ids = described_class.find_registry_differences(range)
expect(unused_tracked_ids).to match_array([project_1.id])
end
end
context 'included in selective sync' do
let!(:registry_entry) { create(:geo_project_registry, project_id: project_5.id) }
it 'excludes tracked project IDs that are in selectively synced projects' do
range = project_5.id..project_5.id
_, unused_tracked_ids = described_class.find_registry_differences(range)
expect(unused_tracked_ids).to be_empty
end
end
end
end
end
end
describe '.synced_repos' do describe '.synced_repos' do
it 'returns clean projects where last attempt to sync succeeded' do it 'returns clean projects where last attempt to sync succeeded' do
expected = [] expected = []
......
...@@ -46,6 +46,10 @@ RSpec.describe Geo::RegistryConsistencyService, :geo, :use_clean_rails_memory_st ...@@ -46,6 +46,10 @@ RSpec.describe Geo::RegistryConsistencyService, :geo, :use_clean_rails_memory_st
expect(registry_class).to respond_to(:delete_for_model_ids) expect(registry_class).to respond_to(:delete_for_model_ids)
end end
it 'responds to .find_registry_differences' do
expect(registry_class).to respond_to(:find_registry_differences)
end
it 'responds to .has_create_events?' do it 'responds to .has_create_events?' do
expect(registry_class).to respond_to(:has_create_events?) expect(registry_class).to respond_to(:has_create_events?)
end end
......
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
require 'spec_helper' require 'spec_helper'
RSpec.describe Geo::RepositoryDestroyService do RSpec.describe Geo::RepositoryDestroyService, :geo do
include ::EE::GeoHelpers include ::EE::GeoHelpers
let_it_be(:secondary) { create(:geo_node) } let_it_be(:secondary) { create(:geo_node) }
...@@ -128,5 +128,49 @@ RSpec.describe Geo::RepositoryDestroyService do ...@@ -128,5 +128,49 @@ RSpec.describe Geo::RepositoryDestroyService do
expect(Geo::DesignRegistry.where(project: project)).to be_empty expect(Geo::DesignRegistry.where(project: project)).to be_empty
end end
end end
context 'with an unused registry' do
let!(:project) { create(:project_empty_repo, :legacy_storage) }
let!(:unused_project_registry) { create(:geo_project_registry, project_id: project.id) }
let!(:unused_design_registry) { create(:geo_design_registry, project_id: project.id) }
subject(:service) { described_class.new(project.id) }
context 'when the replicable model does not exist' do
before do
project.delete
end
it 'does not delegate project removal to Projects::DestroyService' do
expect_any_instance_of(EE::Projects::DestroyService).not_to receive(:geo_replicate)
service.execute
end
it 'removes the registry entries' do
service.execute
expect(Geo::ProjectRegistry.where(project: project)).to be_empty
expect(Geo::DesignRegistry.where(project: project)).to be_empty
end
end
context 'when the replicable model exists' do
subject(:service) { described_class.new(project.id) }
it 'delegates project removal to Projects::DestroyService' do
expect_any_instance_of(EE::Projects::DestroyService).to receive(:geo_replicate)
service.execute
end
it 'removes the registry entries' do
service.execute
expect(Geo::ProjectRegistry.where(project: project)).to be_empty
expect(Geo::DesignRegistry.where(project: project)).to be_empty
end
end
end
end end
end end
...@@ -8,7 +8,6 @@ RSpec.describe Geo::RepositoryShardSyncWorker, :geo, :geo_fdw, :clean_gitlab_red ...@@ -8,7 +8,6 @@ RSpec.describe Geo::RepositoryShardSyncWorker, :geo, :geo_fdw, :clean_gitlab_red
let!(:primary) { create(:geo_node, :primary) } let!(:primary) { create(:geo_node, :primary) }
let!(:secondary) { create(:geo_node) } let!(:secondary) { create(:geo_node) }
let(:shard_name) { Gitlab.config.repositories.storages.each_key.first } let(:shard_name) { Gitlab.config.repositories.storages.each_key.first }
before do before do
...@@ -17,7 +16,6 @@ RSpec.describe Geo::RepositoryShardSyncWorker, :geo, :geo_fdw, :clean_gitlab_red ...@@ -17,7 +16,6 @@ RSpec.describe Geo::RepositoryShardSyncWorker, :geo, :geo_fdw, :clean_gitlab_red
describe '#perform' do describe '#perform' do
let!(:restricted_group) { create(:group) } let!(:restricted_group) { create(:group) }
let!(:unsynced_project_in_restricted_group) { create(:project, group: restricted_group) } let!(:unsynced_project_in_restricted_group) { create(:project, group: restricted_group) }
let!(:unsynced_project) { create(:project) } let!(:unsynced_project) { create(:project) }
...@@ -27,21 +25,6 @@ RSpec.describe Geo::RepositoryShardSyncWorker, :geo, :geo_fdw, :clean_gitlab_red ...@@ -27,21 +25,6 @@ RSpec.describe Geo::RepositoryShardSyncWorker, :geo, :geo_fdw, :clean_gitlab_red
Gitlab::ShardHealthCache.update([shard_name]) Gitlab::ShardHealthCache.update([shard_name])
end end
it 'performs Geo::ProjectSyncWorker for each project' do
expect(Geo::ProjectSyncWorker).to receive(:perform_async).twice.and_return(spy)
subject.perform(shard_name)
end
it 'performs Geo::ProjectSyncWorker for projects where last attempt to sync failed' do
create(:geo_project_registry, :sync_failed, project: unsynced_project_in_restricted_group)
create(:geo_project_registry, :synced, project: unsynced_project)
expect(Geo::ProjectSyncWorker).to receive(:perform_async).once.and_return(spy)
subject.perform(shard_name)
end
it 'does not perform Geo::ProjectSyncWorker when shard becomes unhealthy' do it 'does not perform Geo::ProjectSyncWorker when shard becomes unhealthy' do
Gitlab::ShardHealthCache.update([]) Gitlab::ShardHealthCache.update([])
...@@ -50,28 +33,6 @@ RSpec.describe Geo::RepositoryShardSyncWorker, :geo, :geo_fdw, :clean_gitlab_red ...@@ -50,28 +33,6 @@ RSpec.describe Geo::RepositoryShardSyncWorker, :geo, :geo_fdw, :clean_gitlab_red
subject.perform(shard_name) subject.perform(shard_name)
end end
it 'performs Geo::ProjectSyncWorker for synced projects updated recently' do
create(:geo_project_registry, :synced, :repository_dirty, project: unsynced_project_in_restricted_group)
create(:geo_project_registry, :synced, project: unsynced_project)
create(:geo_project_registry, :synced, :wiki_dirty)
expect(Geo::ProjectSyncWorker).to receive(:perform_async).twice.and_return(spy)
subject.perform(shard_name)
end
it 'does not schedule a job twice for the same project' do
scheduled_jobs = [
{ job_id: 1, project_id: unsynced_project.id },
{ job_id: 2, project_id: unsynced_project_in_restricted_group.id }
]
is_expected.to receive(:scheduled_jobs).and_return(scheduled_jobs).at_least(:once)
is_expected.not_to receive(:schedule_job)
Sidekiq::Testing.inline! { subject.perform(shard_name) }
end
it 'does not perform Geo::ProjectSyncWorker when no geo database is configured' do it 'does not perform Geo::ProjectSyncWorker when no geo database is configured' do
allow(Gitlab::Geo).to receive(:geo_database_configured?) { false } allow(Gitlab::Geo).to receive(:geo_database_configured?) { false }
...@@ -100,210 +61,488 @@ RSpec.describe Geo::RepositoryShardSyncWorker, :geo, :geo_fdw, :clean_gitlab_red ...@@ -100,210 +61,488 @@ RSpec.describe Geo::RepositoryShardSyncWorker, :geo, :geo_fdw, :clean_gitlab_red
subject.perform(shard_name) subject.perform(shard_name)
end end
context 'multiple shards' do context 'number of scheduled jobs exceeds capacity' do
it 'uses two loops to schedule jobs', :sidekiq_might_not_need_inline do it 'schedules 0 jobs' do
expect(subject).to receive(:schedule_jobs).twice.and_call_original is_expected.to receive(:scheduled_job_ids).and_return(1..1000).at_least(:once)
is_expected.not_to receive(:schedule_job)
Gitlab::ShardHealthCache.update([shard_name, 'shard2', 'shard3', 'shard4', 'shard5'])
secondary.update!(repos_max_capacity: 5)
subject.perform(shard_name) Sidekiq::Testing.inline! { subject.perform(shard_name) }
end end
end end
context 'when node has namespace restrictions', :request_store do context 'when geo_project_registry_ssot_sync is enabled' do
before do before do
secondary.update!(selective_sync_type: 'namespaces', namespaces: [restricted_group]) stub_feature_flags(geo_project_registry_ssot_sync: true)
end
allow(::Gitlab::Geo).to receive(:current_node).and_call_original it 'performs Geo::ProjectSyncWorker for each registry' do
Rails.cache.write(:current_node, secondary.to_json) create(:geo_project_registry, project: unsynced_project)
allow(::GeoNode).to receive(:current_node).and_return(secondary)
expect(Geo::ProjectSyncWorker).to receive(:perform_async).once.and_return(spy)
subject.perform(shard_name)
end end
it 'does not perform Geo::ProjectSyncWorker for projects that do not belong to selected namespaces to replicate' do it 'performs Geo::ProjectSyncWorker for projects where last attempt to sync failed' do
expect(Geo::ProjectSyncWorker).to receive(:perform_async) create(:geo_project_registry, :sync_failed, project: unsynced_project_in_restricted_group)
.with(unsynced_project_in_restricted_group.id, sync_repository: true, sync_wiki: true) create(:geo_project_registry, :synced, project: unsynced_project)
.once
.and_return(spy) expect(Geo::ProjectSyncWorker).to receive(:perform_async).once.and_return(spy)
subject.perform(shard_name) subject.perform(shard_name)
end end
it 'does not perform Geo::ProjectSyncWorker for synced projects updated recently that do not belong to selected namespaces to replicate' do it 'performs Geo::ProjectSyncWorker for synced projects updated recently' do
create(:geo_project_registry, :synced, :repository_dirty, project: unsynced_project_in_restricted_group) create(:geo_project_registry, :synced, :repository_dirty, project: unsynced_project_in_restricted_group)
create(:geo_project_registry, :synced, :repository_dirty, project: unsynced_project) create(:geo_project_registry, :synced, project: unsynced_project)
create(:geo_project_registry, :synced, :wiki_dirty)
expect(Geo::ProjectSyncWorker).to receive(:perform_async) expect(Geo::ProjectSyncWorker).to receive(:perform_async).twice.and_return(spy)
.with(unsynced_project_in_restricted_group.id, sync_repository: true, sync_wiki: false)
.once
.and_return(spy)
subject.perform(shard_name) subject.perform(shard_name)
end end
end
context 'repositories that have never been updated' do it 'does not schedule a job twice for the same project' do
let!(:project_list) { create_list(:project, 4, last_repository_updated_at: 2.hours.ago) } create(:geo_project_registry, project: unsynced_project)
let!(:abandoned_project) { create(:project) } create(:geo_project_registry, project: unsynced_project_in_restricted_group)
before do scheduled_jobs = [
# Project sync failed but never received an update { job_id: 1, project_id: unsynced_project.id },
create(:geo_project_registry, :repository_sync_failed, project: abandoned_project) { job_id: 2, project_id: unsynced_project_in_restricted_group.id }
abandoned_project.update_column(:last_repository_updated_at, 1.year.ago) ]
# Neither of these are needed for this spec is_expected.to receive(:scheduled_jobs).and_return(scheduled_jobs).at_least(:once)
unsynced_project.destroy is_expected.not_to receive(:schedule_job)
unsynced_project_in_restricted_group.destroy
allow_next_instance_of(described_class) do |instance| Sidekiq::Testing.inline! { subject.perform(shard_name) }
allow(instance).to receive(:db_retrieve_batch_size).and_return(2) # Must be >1 because of the Geo::BaseSchedulerWorker#interleave end
end
secondary.update!(repos_max_capacity: 3) # Must be more than db_retrieve_batch_size
project_list.each do |project| context 'backoff time' do
allow(Geo::ProjectSyncWorker) let(:cache_key) { "#{described_class.name.underscore}:shard:#{shard_name}:skip" }
.to receive(:perform_async)
.with(project.id, anything) before do
.and_call_original allow(Rails.cache).to receive(:read).and_call_original
allow(Rails.cache).to receive(:write).and_call_original
end end
allow_next_instance_of(Geo::ProjectRegistry) do |instance| it 'sets the back off time when there are no pending items' do
allow(instance).to receive(:wiki_sync_due?).and_return(false) create(:geo_project_registry, :synced, project: unsynced_project_in_restricted_group)
create(:geo_project_registry, :synced, project: unsynced_project)
expect(Rails.cache).to receive(:write).with(cache_key, true, expires_in: 300.seconds).once
subject.perform(shard_name)
end end
allow_next_instance_of(Geo::RepositorySyncService) do |instance|
allow(instance).to receive(:expire_repository_caches) it 'does not perform Geo::ProjectSyncWorker when the backoff time is set' do
expect(Rails.cache).to receive(:read).with(cache_key).and_return(true)
expect(Geo::ProjectSyncWorker).not_to receive(:perform_async)
subject.perform(shard_name)
end end
end end
it 'tries to sync project where last attempt to sync failed' do context 'repositories that have never been updated' do
expect(Geo::ProjectSyncWorker) let!(:project_list) { create_list(:project, 4, last_repository_updated_at: 2.hours.ago) }
.to receive(:perform_async) let!(:abandoned_project) { create(:project) }
.with(abandoned_project.id, anything)
.at_least(:once)
.and_return(spy)
3.times do before do
Sidekiq::Testing.inline! { subject.perform(shard_name) } # Project sync failed but never received an update
create(:geo_project_registry, :repository_sync_failed, project: abandoned_project)
abandoned_project.update_column(:last_repository_updated_at, 1.year.ago)
# Neither of these are needed for this spec
unsynced_project.destroy
unsynced_project_in_restricted_group.destroy
allow_next_instance_of(described_class) do |instance|
allow(instance).to receive(:db_retrieve_batch_size).and_return(2) # Must be >1 because of the Geo::BaseSchedulerWorker#interleave
end
secondary.update!(repos_max_capacity: 3) # Must be more than db_retrieve_batch_size
project_list.each do |project|
create(:geo_project_registry, project: project)
allow(Geo::ProjectSyncWorker)
.to receive(:perform_async)
.with(project.id, anything)
.and_call_original
end
allow_next_instance_of(Geo::ProjectRegistry) do |instance|
allow(instance).to receive(:wiki_sync_due?).and_return(false)
end
allow_next_instance_of(Geo::RepositorySyncService) do |instance|
allow(instance).to receive(:expire_repository_caches)
end
end
it 'tries to sync project where last attempt to sync failed' do
expect(Geo::ProjectSyncWorker)
.to receive(:perform_async)
.with(abandoned_project.id, anything)
.at_least(:once)
.and_return(spy)
3.times do
Sidekiq::Testing.inline! { subject.perform(shard_name) }
end
end end
end end
end
context 'projects that require resync' do context 'multiple shards' do
context 'when project repository is dirty' do it 'uses two loops to schedule jobs', :sidekiq_might_not_need_inline do
it 'syncs repository only' do create(:geo_project_registry, project: unsynced_project)
create(:geo_project_registry, :synced, :repository_dirty, project: unsynced_project) create(:geo_project_registry, project: unsynced_project_in_restricted_group)
create(:geo_project_registry, :synced, :repository_dirty, project: unsynced_project_in_restricted_group)
Gitlab::ShardHealthCache.update([shard_name, 'shard2', 'shard3', 'shard4', 'shard5'])
secondary.update!(repos_max_capacity: 5)
expect(Geo::ProjectSyncWorker).to receive(:perform_async).with(unsynced_project.id, sync_repository: true, sync_wiki: false) expect(subject).to receive(:schedule_jobs).twice.and_call_original
expect(Geo::ProjectSyncWorker).to receive(:perform_async).with(unsynced_project_in_restricted_group.id, sync_repository: true, sync_wiki: false)
subject.perform(shard_name) subject.perform(shard_name)
end end
end end
context 'when project wiki is dirty' do context 'all repositories fail' do
it 'syncs wiki only' do let!(:project_list) { create_list(:project, 4, :random_last_repository_updated_at) }
create(:geo_project_registry, :synced, :wiki_dirty, project: unsynced_project)
create(:geo_project_registry, :synced, :wiki_dirty, project: unsynced_project_in_restricted_group)
expect(Geo::ProjectSyncWorker).to receive(:perform_async).with(unsynced_project.id, sync_repository: false, sync_wiki: true) before do
expect(Geo::ProjectSyncWorker).to receive(:perform_async).with(unsynced_project_in_restricted_group.id, sync_repository: false, sync_wiki: true) # Neither of these are needed for this spec
unsynced_project.destroy
unsynced_project_in_restricted_group.destroy
subject.perform(shard_name) allow_next_instance_of(described_class) do |instance|
allow(instance).to receive(:db_retrieve_batch_size).and_return(2) # Must be >1 because of the Geo::BaseSchedulerWorker#interleave
end
secondary.update!(repos_max_capacity: 3) # Must be more than db_retrieve_batch_size
allow_next_instance_of(Project) do |instance|
allow(instance).to receive(:ensure_repository).and_raise(Gitlab::Shell::Error.new('foo'))
end
allow_next_instance_of(Geo::ProjectRegistry) do |instance|
allow(instance).to receive(:wiki_sync_due?).and_return(false)
end
allow_next_instance_of(Geo::RepositorySyncService) do |instance|
allow(instance).to receive(:expire_repository_caches)
end
allow_next_instance_of(Geo::ProjectHousekeepingService) do |instance|
allow(instance).to receive(:do_housekeeping)
end
end end
end
end
context 'all repositories fail' do it 'tries to sync every project' do
let!(:project_list) { create_list(:project, 4, :random_last_repository_updated_at) } project_list.each do |project|
create(:geo_project_registry, project: project)
before do expect(Geo::ProjectSyncWorker)
# Neither of these are needed for this spec .to receive(:perform_async)
unsynced_project.destroy .with(project.id, anything)
unsynced_project_in_restricted_group.destroy .at_least(:once)
.and_call_original
end
allow_next_instance_of(described_class) do |instance| 3.times do
allow(instance).to receive(:db_retrieve_batch_size).and_return(2) # Must be >1 because of the Geo::BaseSchedulerWorker#interleave Sidekiq::Testing.inline! { subject.perform(shard_name) }
end
end end
secondary.update!(repos_max_capacity: 3) # Must be more than db_retrieve_batch_size
allow_next_instance_of(Project) do |instance| context 'projects that require resync' do
allow(instance).to receive(:ensure_repository).and_raise(Gitlab::Shell::Error.new('foo')) context 'when project repository is dirty' do
it 'does not sync repositories' do
create(:geo_project_registry, :synced, :repository_dirty, project: unsynced_project)
create(:geo_project_registry, :synced, :repository_dirty, project: unsynced_project_in_restricted_group)
expect(Geo::ProjectSyncWorker).not_to receive(:perform_async).with(unsynced_project.id, sync_repository: true, sync_wiki: false)
expect(Geo::ProjectSyncWorker).not_to receive(:perform_async).with(unsynced_project_in_restricted_group.id, sync_repository: true, sync_wiki: false)
subject.perform(shard_name)
end
end
context 'when project wiki is dirty' do
it 'does not syn wikis' do
create(:geo_project_registry, :synced, :wiki_dirty, project: unsynced_project)
create(:geo_project_registry, :synced, :wiki_dirty, project: unsynced_project_in_restricted_group)
expect(Geo::ProjectSyncWorker).not_to receive(:perform_async).with(unsynced_project.id, sync_repository: false, sync_wiki: true)
expect(Geo::ProjectSyncWorker).not_to receive(:perform_async).with(unsynced_project_in_restricted_group.id, sync_repository: false, sync_wiki: true)
subject.perform(shard_name)
end
end
end end
allow_next_instance_of(Geo::ProjectRegistry) do |instance| end
allow(instance).to receive(:wiki_sync_due?).and_return(false)
context 'additional shards' do
it 'skips backfill for projects on unhealthy shards' do
missing_not_synced = create(:project, group: restricted_group)
missing_not_synced.update_column(:repository_storage, 'unknown')
missing_dirty = create(:project, group: restricted_group)
missing_dirty.update_column(:repository_storage, 'unknown')
create(:geo_project_registry, project: unsynced_project_in_restricted_group)
create(:geo_project_registry, project: missing_dirty)
create(:geo_project_registry, project: missing_not_synced)
expect(Geo::ProjectSyncWorker).to receive(:perform_async).with(unsynced_project_in_restricted_group.id, anything)
expect(Geo::ProjectSyncWorker).not_to receive(:perform_async).with(missing_not_synced.id, anything)
expect(Geo::ProjectSyncWorker).not_to receive(:perform_async).with(missing_dirty.id, anything)
Sidekiq::Testing.inline! { subject.perform(shard_name) }
end end
allow_next_instance_of(Geo::RepositorySyncService) do |instance| end
allow(instance).to receive(:expire_repository_caches) end
context 'when geo_project_registry_ssot_sync is disabled' do
before do
stub_feature_flags(geo_project_registry_ssot_sync: false)
end
it 'performs Geo::ProjectSyncWorker for each project' do
expect(Geo::ProjectSyncWorker).to receive(:perform_async).twice.and_return(spy)
subject.perform(shard_name)
end
it 'performs Geo::ProjectSyncWorker for projects where last attempt to sync failed' do
create(:geo_project_registry, :sync_failed, project: unsynced_project_in_restricted_group)
create(:geo_project_registry, :synced, project: unsynced_project)
expect(Geo::ProjectSyncWorker).to receive(:perform_async).once.and_return(spy)
subject.perform(shard_name)
end
it 'performs Geo::ProjectSyncWorker for synced projects updated recently' do
create(:geo_project_registry, :synced, :repository_dirty, project: unsynced_project_in_restricted_group)
create(:geo_project_registry, :synced, project: unsynced_project)
create(:geo_project_registry, :synced, :wiki_dirty)
expect(Geo::ProjectSyncWorker).to receive(:perform_async).twice.and_return(spy)
subject.perform(shard_name)
end
it 'does not schedule a job twice for the same project' do
scheduled_jobs = [
{ job_id: 1, project_id: unsynced_project.id },
{ job_id: 2, project_id: unsynced_project_in_restricted_group.id }
]
is_expected.to receive(:scheduled_jobs).and_return(scheduled_jobs).at_least(:once)
is_expected.not_to receive(:schedule_job)
Sidekiq::Testing.inline! { subject.perform(shard_name) }
end
context 'backoff time' do
let(:cache_key) { "#{described_class.name.underscore}:shard:#{shard_name}:skip" }
before do
allow(Rails.cache).to receive(:read).and_call_original
allow(Rails.cache).to receive(:write).and_call_original
end
it 'sets the back off time when there are no pending items' do
create(:geo_project_registry, :synced, project: unsynced_project_in_restricted_group)
create(:geo_project_registry, :synced, project: unsynced_project)
expect(Rails.cache).to receive(:write).with(cache_key, true, expires_in: 300.seconds).once
subject.perform(shard_name)
end end
allow_next_instance_of(Geo::ProjectHousekeepingService) do |instance|
allow(instance).to receive(:do_housekeeping) it 'does not perform Geo::ProjectSyncWorker when the backoff time is set' do
expect(Rails.cache).to receive(:read).with(cache_key).and_return(true)
expect(Geo::ProjectSyncWorker).not_to receive(:perform_async)
subject.perform(shard_name)
end end
end end
it 'tries to sync every project' do context 'repositories that have never been updated' do
project_list.each do |project| let!(:project_list) { create_list(:project, 4, last_repository_updated_at: 2.hours.ago) }
let!(:abandoned_project) { create(:project) }
before do
# Project sync failed but never received an update
create(:geo_project_registry, :repository_sync_failed, project: abandoned_project)
abandoned_project.update_column(:last_repository_updated_at, 1.year.ago)
# Neither of these are needed for this spec
unsynced_project.destroy
unsynced_project_in_restricted_group.destroy
allow_next_instance_of(described_class) do |instance|
allow(instance).to receive(:db_retrieve_batch_size).and_return(2) # Must be >1 because of the Geo::BaseSchedulerWorker#interleave
end
secondary.update!(repos_max_capacity: 3) # Must be more than db_retrieve_batch_size
project_list.each do |project|
allow(Geo::ProjectSyncWorker)
.to receive(:perform_async)
.with(project.id, anything)
.and_call_original
end
allow_next_instance_of(Geo::ProjectRegistry) do |instance|
allow(instance).to receive(:wiki_sync_due?).and_return(false)
end
allow_next_instance_of(Geo::RepositorySyncService) do |instance|
allow(instance).to receive(:expire_repository_caches)
end
end
it 'tries to sync project where last attempt to sync failed' do
expect(Geo::ProjectSyncWorker) expect(Geo::ProjectSyncWorker)
.to receive(:perform_async) .to receive(:perform_async)
.with(project.id, anything) .with(abandoned_project.id, anything)
.at_least(:once) .at_least(:once)
.and_call_original .and_return(spy)
3.times do
Sidekiq::Testing.inline! { subject.perform(shard_name) }
end
end end
end
3.times do context 'multiple shards' do
Sidekiq::Testing.inline! { subject.perform(shard_name) } it 'uses two loops to schedule jobs', :sidekiq_might_not_need_inline do
expect(subject).to receive(:schedule_jobs).twice.and_call_original
Gitlab::ShardHealthCache.update([shard_name, 'shard2', 'shard3', 'shard4', 'shard5'])
secondary.update!(repos_max_capacity: 5)
subject.perform(shard_name)
end end
end end
end
context 'additional shards' do context 'when node has namespace restrictions', :request_store do
it 'skips backfill for projects on unhealthy shards' do before do
missing_not_synced = create(:project, group: restricted_group) secondary.update!(selective_sync_type: 'namespaces', namespaces: [restricted_group])
missing_not_synced.update_column(:repository_storage, 'unknown')
missing_dirty = create(:project, group: restricted_group)
missing_dirty.update_column(:repository_storage, 'unknown')
create(:geo_project_registry, :synced, :repository_dirty, project: missing_dirty) allow(::Gitlab::Geo).to receive(:current_node).and_call_original
Rails.cache.write(:current_node, secondary.to_json)
allow(::GeoNode).to receive(:current_node).and_return(secondary)
end
expect(Geo::ProjectSyncWorker).to receive(:perform_async).with(unsynced_project_in_restricted_group.id, anything) it 'does not perform Geo::ProjectSyncWorker for projects that do not belong to selected namespaces to replicate' do
expect(Geo::ProjectSyncWorker).not_to receive(:perform_async).with(missing_not_synced.id, anything) expect(Geo::ProjectSyncWorker).to receive(:perform_async)
expect(Geo::ProjectSyncWorker).not_to receive(:perform_async).with(missing_dirty.id, anything) .with(unsynced_project_in_restricted_group.id, sync_repository: true, sync_wiki: true)
.once
.and_return(spy)
Sidekiq::Testing.inline! { subject.perform(shard_name) } subject.perform(shard_name)
end end
end
context 'number of scheduled jobs exceeds capacity' do it 'does not perform Geo::ProjectSyncWorker for synced projects updated recently that do not belong to selected namespaces to replicate' do
it 'schedules 0 jobs' do create(:geo_project_registry, :synced, :repository_dirty, project: unsynced_project_in_restricted_group)
is_expected.to receive(:scheduled_job_ids).and_return(1..1000).at_least(:once) create(:geo_project_registry, :synced, :repository_dirty, project: unsynced_project)
is_expected.not_to receive(:schedule_job)
Sidekiq::Testing.inline! { subject.perform(shard_name) } expect(Geo::ProjectSyncWorker).to receive(:perform_async)
.with(unsynced_project_in_restricted_group.id, sync_repository: true, sync_wiki: false)
.once
.and_return(spy)
subject.perform(shard_name)
end
end end
end
context 'backoff time' do context 'all repositories fail' do
let(:cache_key) { "#{described_class.name.underscore}:shard:#{shard_name}:skip" } let!(:project_list) { create_list(:project, 4, :random_last_repository_updated_at) }
before do before do
allow(Rails.cache).to receive(:read).and_call_original # Neither of these are needed for this spec
allow(Rails.cache).to receive(:write).and_call_original unsynced_project.destroy
unsynced_project_in_restricted_group.destroy
allow_next_instance_of(described_class) do |instance|
allow(instance).to receive(:db_retrieve_batch_size).and_return(2) # Must be >1 because of the Geo::BaseSchedulerWorker#interleave
end
secondary.update!(repos_max_capacity: 3) # Must be more than db_retrieve_batch_size
allow_next_instance_of(Project) do |instance|
allow(instance).to receive(:ensure_repository).and_raise(Gitlab::Shell::Error.new('foo'))
end
allow_next_instance_of(Geo::ProjectRegistry) do |instance|
allow(instance).to receive(:wiki_sync_due?).and_return(false)
end
allow_next_instance_of(Geo::RepositorySyncService) do |instance|
allow(instance).to receive(:expire_repository_caches)
end
allow_next_instance_of(Geo::ProjectHousekeepingService) do |instance|
allow(instance).to receive(:do_housekeeping)
end
end
it 'tries to sync every project' do
project_list.each do |project|
expect(Geo::ProjectSyncWorker)
.to receive(:perform_async)
.with(project.id, anything)
.at_least(:once)
.and_call_original
end
3.times do
Sidekiq::Testing.inline! { subject.perform(shard_name) }
end
end
end end
it 'sets the back off time when there are no pending items' do context 'additional shards' do
create(:geo_project_registry, :synced, project: unsynced_project_in_restricted_group) it 'skips backfill for projects on unhealthy shards' do
create(:geo_project_registry, :synced, project: unsynced_project) missing_not_synced = create(:project, group: restricted_group)
missing_not_synced.update_column(:repository_storage, 'unknown')
missing_dirty = create(:project, group: restricted_group)
missing_dirty.update_column(:repository_storage, 'unknown')
expect(Rails.cache).to receive(:write).with(cache_key, true, expires_in: 300.seconds).once create(:geo_project_registry, :synced, :repository_dirty, project: missing_dirty)
subject.perform(shard_name) expect(Geo::ProjectSyncWorker).to receive(:perform_async).with(unsynced_project_in_restricted_group.id, anything)
expect(Geo::ProjectSyncWorker).not_to receive(:perform_async).with(missing_not_synced.id, anything)
expect(Geo::ProjectSyncWorker).not_to receive(:perform_async).with(missing_dirty.id, anything)
Sidekiq::Testing.inline! { subject.perform(shard_name) }
end
end end
it 'does not perform Geo::ProjectSyncWorker when the backoff time is set' do context 'projects that require resync' do
expect(Rails.cache).to receive(:read).with(cache_key).and_return(true) context 'when project repository is dirty' do
it 'syncs repository only' do
create(:geo_project_registry, :synced, :repository_dirty, project: unsynced_project)
create(:geo_project_registry, :synced, :repository_dirty, project: unsynced_project_in_restricted_group)
expect(Geo::ProjectSyncWorker).not_to receive(:perform_async) expect(Geo::ProjectSyncWorker).to receive(:perform_async).with(unsynced_project.id, sync_repository: true, sync_wiki: false)
expect(Geo::ProjectSyncWorker).to receive(:perform_async).with(unsynced_project_in_restricted_group.id, sync_repository: true, sync_wiki: false)
subject.perform(shard_name) subject.perform(shard_name)
end
end
context 'when project wiki is dirty' do
it 'syncs wiki only' do
create(:geo_project_registry, :synced, :wiki_dirty, project: unsynced_project)
create(:geo_project_registry, :synced, :wiki_dirty, project: unsynced_project_in_restricted_group)
expect(Geo::ProjectSyncWorker).to receive(:perform_async).with(unsynced_project.id, sync_repository: false, sync_wiki: true)
expect(Geo::ProjectSyncWorker).to receive(:perform_async).with(unsynced_project_in_restricted_group.id, sync_repository: false, sync_wiki: true)
subject.perform(shard_name)
end
end
end end
end end
end end
......
...@@ -76,13 +76,15 @@ RSpec.describe Geo::Secondary::RegistryConsistencyWorker, :geo, :geo_fdw do ...@@ -76,13 +76,15 @@ RSpec.describe Geo::Secondary::RegistryConsistencyWorker, :geo, :geo_fdw do
# Somewhat of an integration test # Somewhat of an integration test
it 'creates missing registries for each registry class' do it 'creates missing registries for each registry class' do
lfs_object = create(:lfs_object)
job_artifact = create(:ci_job_artifact) job_artifact = create(:ci_job_artifact)
lfs_object = create(:lfs_object)
project = create(:project)
upload = create(:upload) upload = create(:upload)
package_file = create(:conan_package_file, :conan_package) package_file = create(:conan_package_file, :conan_package)
expect(Geo::LfsObjectRegistry.where(lfs_object_id: lfs_object.id).count).to eq(0) expect(Geo::LfsObjectRegistry.where(lfs_object_id: lfs_object.id).count).to eq(0)
expect(Geo::JobArtifactRegistry.where(artifact_id: job_artifact.id).count).to eq(0) expect(Geo::JobArtifactRegistry.where(artifact_id: job_artifact.id).count).to eq(0)
expect(Geo::ProjectRegistry.where(project_id: project.id).count).to eq(0)
expect(Geo::UploadRegistry.where(file_id: upload.id).count).to eq(0) expect(Geo::UploadRegistry.where(file_id: upload.id).count).to eq(0)
expect(Geo::PackageFileRegistry.where(package_file_id: package_file.id).count).to eq(0) expect(Geo::PackageFileRegistry.where(package_file_id: package_file.id).count).to eq(0)
...@@ -90,13 +92,12 @@ RSpec.describe Geo::Secondary::RegistryConsistencyWorker, :geo, :geo_fdw do ...@@ -90,13 +92,12 @@ RSpec.describe Geo::Secondary::RegistryConsistencyWorker, :geo, :geo_fdw do
expect(Geo::LfsObjectRegistry.where(lfs_object_id: lfs_object.id).count).to eq(1) expect(Geo::LfsObjectRegistry.where(lfs_object_id: lfs_object.id).count).to eq(1)
expect(Geo::JobArtifactRegistry.where(artifact_id: job_artifact.id).count).to eq(1) expect(Geo::JobArtifactRegistry.where(artifact_id: job_artifact.id).count).to eq(1)
expect(Geo::ProjectRegistry.where(project_id: project.id).count).to eq(1)
expect(Geo::UploadRegistry.where(file_id: upload.id).count).to eq(1) expect(Geo::UploadRegistry.where(file_id: upload.id).count).to eq(1)
expect(Geo::PackageFileRegistry.where(package_file_id: package_file.id).count).to eq(1) expect(Geo::PackageFileRegistry.where(package_file_id: package_file.id).count).to eq(1)
end end
context 'when geo_job_artifact_registry_ssot_sync is disabled' do context 'when geo_job_artifact_registry_ssot_sync is disabled' do
let_it_be(:job_artifact) { create(:ci_job_artifact) }
before do before do
stub_feature_flags(geo_job_artifact_registry_ssot_sync: false) stub_feature_flags(geo_job_artifact_registry_ssot_sync: false)
end end
...@@ -107,6 +108,7 @@ RSpec.describe Geo::Secondary::RegistryConsistencyWorker, :geo, :geo_fdw do ...@@ -107,6 +108,7 @@ RSpec.describe Geo::Secondary::RegistryConsistencyWorker, :geo, :geo_fdw do
it 'does not execute RegistryConsistencyService for Job Artifacts' do it 'does not execute RegistryConsistencyService for Job Artifacts' do
allow(Geo::RegistryConsistencyService).to receive(:new).with(Geo::LfsObjectRegistry, batch_size: 1000).and_call_original allow(Geo::RegistryConsistencyService).to receive(:new).with(Geo::LfsObjectRegistry, batch_size: 1000).and_call_original
allow(Geo::RegistryConsistencyService).to receive(:new).with(Geo::ProjectRegistry, batch_size: 1000).and_call_original
allow(Geo::RegistryConsistencyService).to receive(:new).with(Geo::UploadRegistry, batch_size: 1000).and_call_original allow(Geo::RegistryConsistencyService).to receive(:new).with(Geo::UploadRegistry, batch_size: 1000).and_call_original
allow(Geo::RegistryConsistencyService).to receive(:new).with(Geo::PackageFileRegistry, batch_size: 1000).and_call_original allow(Geo::RegistryConsistencyService).to receive(:new).with(Geo::PackageFileRegistry, batch_size: 1000).and_call_original
...@@ -117,8 +119,6 @@ RSpec.describe Geo::Secondary::RegistryConsistencyWorker, :geo, :geo_fdw do ...@@ -117,8 +119,6 @@ RSpec.describe Geo::Secondary::RegistryConsistencyWorker, :geo, :geo_fdw do
end end
context 'when geo_file_registry_ssot_sync is disabled' do context 'when geo_file_registry_ssot_sync is disabled' do
let_it_be(:upload) { create(:upload) }
before do before do
stub_feature_flags(geo_file_registry_ssot_sync: false) stub_feature_flags(geo_file_registry_ssot_sync: false)
end end
...@@ -131,6 +131,7 @@ RSpec.describe Geo::Secondary::RegistryConsistencyWorker, :geo, :geo_fdw do ...@@ -131,6 +131,7 @@ RSpec.describe Geo::Secondary::RegistryConsistencyWorker, :geo, :geo_fdw do
allow(Geo::RegistryConsistencyService).to receive(:new).with(Geo::JobArtifactRegistry, batch_size: 1000).and_call_original allow(Geo::RegistryConsistencyService).to receive(:new).with(Geo::JobArtifactRegistry, batch_size: 1000).and_call_original
allow(Geo::RegistryConsistencyService).to receive(:new).with(Geo::LfsObjectRegistry, batch_size: 1000).and_call_original allow(Geo::RegistryConsistencyService).to receive(:new).with(Geo::LfsObjectRegistry, batch_size: 1000).and_call_original
allow(Geo::RegistryConsistencyService).to receive(:new).with(Geo::PackageFileRegistry, batch_size: 1000).and_call_original allow(Geo::RegistryConsistencyService).to receive(:new).with(Geo::PackageFileRegistry, batch_size: 1000).and_call_original
allow(Geo::RegistryConsistencyService).to receive(:new).with(Geo::ProjectRegistry, batch_size: 1000).and_call_original
expect(Geo::RegistryConsistencyService).not_to receive(:new).with(Geo::UploadRegistry, batch_size: 1000) expect(Geo::RegistryConsistencyService).not_to receive(:new).with(Geo::UploadRegistry, batch_size: 1000)
...@@ -138,6 +139,27 @@ RSpec.describe Geo::Secondary::RegistryConsistencyWorker, :geo, :geo_fdw do ...@@ -138,6 +139,27 @@ RSpec.describe Geo::Secondary::RegistryConsistencyWorker, :geo, :geo_fdw do
end end
end end
context 'when geo_project_registry_ssot_sync is disabled' do
before do
stub_feature_flags(geo_project_registry_ssot_sync: false)
end
it 'returns false' do
expect(subject.perform).to be_falsey
end
it 'does not execute RegistryConsistencyService for projects' do
allow(Geo::RegistryConsistencyService).to receive(:new).with(Geo::JobArtifactRegistry, batch_size: 1000).and_call_original
allow(Geo::RegistryConsistencyService).to receive(:new).with(Geo::LfsObjectRegistry, batch_size: 1000).and_call_original
allow(Geo::RegistryConsistencyService).to receive(:new).with(Geo::PackageFileRegistry, batch_size: 1000).and_call_original
allow(Geo::RegistryConsistencyService).to receive(:new).with(Geo::UploadRegistry, batch_size: 1000).and_call_original
expect(Geo::RegistryConsistencyService).not_to receive(:new).with(Geo::ProjectRegistry, batch_size: 1000)
subject.perform
end
end
context 'when the current Geo node is disabled or primary' do context 'when the current Geo node is disabled or primary' do
before do before do
stub_primary_node stub_primary_node
......
...@@ -2,16 +2,31 @@ ...@@ -2,16 +2,31 @@
require 'spec_helper' require 'spec_helper'
RSpec.describe GeoRepositoryDestroyWorker do RSpec.describe GeoRepositoryDestroyWorker, :geo do
describe '#perform' do describe '#perform' do
it 'delegates project removal to Geo::RepositoryDestroyService' do let(:project) { create(:project) }
project = create(:project)
expect_next_instance_of(Geo::RepositoryDestroyService) do |instance| context 'with an existing project' do
expect(instance).to receive(:execute) it 'delegates project removal to Geo::RepositoryDestroyService' do
expect_next_instance_of(Geo::RepositoryDestroyService) do |instance|
expect(instance).to receive(:execute)
end
subject.perform(project.id, project.name, project.path, 'default')
end end
end
context 'with project ID from an orphaned registry' do
it 'delegates project removal to Geo::RepositoryDestroyService' do
registry = create(:geo_project_registry, project_id: project.id)
project.delete
described_class.new.perform(project.id, project.name, project.path, 'default') expect_next_instance_of(Geo::RepositoryDestroyService) do |instance|
expect(instance).to receive(:execute)
end
subject.perform(registry.project_id)
end
end end
end end
end end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment