Commit b13250ef authored by Tiger Watson's avatar Tiger Watson

Merge branch '227680-geo-make-registry-tables-as-ssot-for-container-registry' into 'master'

Geo - Make Registry tables as SSOT for Container Registries

Closes #227680

See merge request gitlab-org/gitlab!37132
parents 452043cb 6ee2e50d
......@@ -3,59 +3,103 @@
module Geo
class ContainerRepositoryRegistryFinder < RegistryFinder
def count_syncable
container_repositories.count
current_node_non_fdw.container_repositories.count
end
def count_synced
registries_for_container_repositories
.merge(Geo::ContainerRepositoryRegistry.synced).count
registries.merge(Geo::ContainerRepositoryRegistry.synced).count
end
def count_failed
registries_for_container_repositories
.merge(Geo::ContainerRepositoryRegistry.failed).count
registries.merge(Geo::ContainerRepositoryRegistry.failed).count
end
def count_registry
Geo::ContainerRepositoryRegistry.count
end
def find_registry_differences(range)
source_ids = current_node_non_fdw.container_repositories.id_in(range).pluck_primary_key
tracked_ids = Geo::ContainerRepositoryRegistry.pluck_model_ids_in_range(range)
untracked_ids = source_ids - tracked_ids
unused_tracked_ids = tracked_ids - source_ids
[untracked_ids, unused_tracked_ids]
end
# Returns Geo::ContainerRepositoryRegistry records that have never been synced.
#
# Does not care about selective sync, because it considers the Registry
# table to be the single source of truth. The contract is that other
# processes need to ensure that the table only contains records that should
# be synced.
#
# Any registries that have ever been synced that currently need to be
# resynced will be handled by other find methods (like
# #find_retryable_dirty_registries)
#
# You can pass a list with `except_ids:` so you can exclude items you
# already scheduled but haven't finished and aren't persisted to the database yet
#
# @param [Integer] batch_size used to limit the results returned
# @param [Array<Integer>] except_ids ids that will be ignored from the query
# rubocop:disable CodeReuse/ActiveRecord
def find_never_synced_registries(batch_size:, except_ids: [])
Geo::ContainerRepositoryRegistry
.never_synced
.model_id_not_in(except_ids)
.limit(batch_size)
end
# rubocop:enable CodeReuse/ActiveRecord
# rubocop:disable CodeReuse/ActiveRecord
def find_retryable_dirty_registries(batch_size:, except_ids: [])
Geo::ContainerRepositoryRegistry
.failed
.retry_due
.model_id_not_in(except_ids)
.order(Gitlab::Database.nulls_first_order(:last_synced_at))
.limit(batch_size)
end
# rubocop:enable CodeReuse/ActiveRecord
# Find limited amount of non replicated container repositories.
#
# You can pass a list with `except_repository_ids:` so you can exclude items you
# You can pass a list with `except_ids:` so you can exclude items you
# already scheduled but haven't finished and aren't persisted to the database yet
#
# @param [Integer] batch_size used to limit the results returned
# @param [Array<Integer>] except_repository_ids ids that will be ignored from the query
# rubocop: disable CodeReuse/ActiveRecord
def find_unsynced(batch_size:, except_repository_ids: [])
container_repositories
# @param [Array<Integer>] except_ids ids that will be ignored from the query
# rubocop:disable CodeReuse/ActiveRecord
def find_unsynced(batch_size:, except_ids: [])
current_node_fdw
.container_repositories
.missing_container_repository_registry
.id_not_in(except_repository_ids)
.id_not_in(except_ids)
.limit(batch_size)
end
# rubocop: enable CodeReuse/ActiveRecord
# rubocop:enable CodeReuse/ActiveRecord
# rubocop: disable CodeReuse/ActiveRecord
def find_retryable_failed_ids(batch_size:, except_repository_ids: [])
# rubocop:disable CodeReuse/ActiveRecord
def find_retryable_failed_ids(batch_size:, except_ids: [])
Geo::ContainerRepositoryRegistry
.failed
.retry_due
.repository_id_not_in(except_repository_ids)
.model_id_not_in(except_ids)
.limit(batch_size)
.pluck_container_repository_key
end
# rubocop: enable CodeReuse/ActiveRecord
# rubocop:enable CodeReuse/ActiveRecord
private
def container_repositories
current_node.container_repositories
def registries
if Geo::ContainerRepositoryRegistry.registry_consistency_worker_enabled?
Geo::ContainerRepositoryRegistry.all
else
current_node_fdw.container_repositories.inner_join_container_repository_registry
end
def registries_for_container_repositories
container_repositories
.inner_join_container_repository_registry
end
end
end
......@@ -5,7 +5,7 @@ module EE
extend ActiveSupport::Concern
prepended do
scope :project_id_in, ->(ids) { joins(:project).merge(Project.id_in(ids)) }
scope :project_id_in, ->(ids) { joins(:project).merge(::Project.id_in(ids)) }
end
def push_blob(digest, file_path)
......
......@@ -3,9 +3,12 @@
class Geo::ContainerRepositoryRegistry < Geo::BaseRegistry
include ::Delay
MODEL_CLASS = ::ContainerRepository
MODEL_FOREIGN_KEY = :container_repository_id
belongs_to :container_repository
scope :repository_id_not_in, -> (ids) { where.not(container_repository_id: ids) }
scope :never_synced, -> { with_state(:pending).where(last_synced_at: nil) }
scope :failed, -> { with_state(:failed) }
scope :synced, -> { with_state(:synced) }
scope :retry_due, -> { where(arel_table[:retry_at].eq(nil).or(arel_table[:retry_at].lt(Time.current))) }
......@@ -34,10 +37,28 @@ class Geo::ContainerRepositoryRegistry < Geo::BaseRegistry
end
end
def self.finder_class
::Geo::ContainerRepositoryRegistryFinder
end
def self.find_registry_differences(range)
finder_class.new(current_node_id: Gitlab::Geo.current_node.id).find_registry_differences(range)
end
def self.delete_for_model_ids(container_repository_ids)
where(container_repository_id: container_repository_ids).delete_all
container_repository_ids
end
def self.pluck_container_repository_key
where(nil).pluck(:container_repository_id)
end
def self.registry_consistency_worker_enabled?
Feature.enabled?(:geo_container_registry_ssot_sync, default_enabled: true)
end
def self.replication_enabled?
Gitlab.config.geo.registry_replication.enabled
end
......
......@@ -257,6 +257,7 @@ class GeoNode < ApplicationRecord
end
def container_repositories
return ContainerRepository.none unless Geo::ContainerRepositoryRegistry.replication_enabled?
return ContainerRepository.all unless selective_sync?
ContainerRepository.project_id_in(projects)
......
......@@ -28,35 +28,49 @@ module Geo
{ id: repository_id, job_id: job_id } if job_id
end
def scheduled_repository_ids
scheduled_jobs.map { |data| data[:id] }
end
# Pools for new resources to be transferred
#
# @return [Array] resources to be transferred
def load_pending_resources
resources = find_unsynced_repositories(batch_size: db_retrieve_batch_size)
resources = find_container_repository_ids_not_synced(batch_size: db_retrieve_batch_size)
remaining_capacity = db_retrieve_batch_size - resources.size
if remaining_capacity.zero?
resources
else
resources + find_retryable_failed_repositories(batch_size: remaining_capacity)
resources + find_retryable_container_registry_ids(batch_size: remaining_capacity)
end
end
def find_unsynced_repositories(batch_size:)
Geo::ContainerRepositoryRegistryFinder
.new(current_node_id: current_node.id)
.find_unsynced(batch_size: batch_size, except_repository_ids: scheduled_repository_ids)
def find_container_repository_ids_not_synced(batch_size:)
if Geo::ContainerRepositoryRegistry.registry_consistency_worker_enabled?
registry_finder
.find_never_synced_registries(batch_size: batch_size, except_ids: scheduled_repository_ids)
.pluck_model_foreign_key
else
registry_finder
.find_unsynced(batch_size: batch_size, except_ids: scheduled_repository_ids)
.pluck_primary_key
end
end
def find_retryable_failed_repositories(batch_size:)
Geo::ContainerRepositoryRegistryFinder
.new(current_node_id: current_node.id)
.find_retryable_failed_ids(batch_size: batch_size, except_repository_ids: scheduled_repository_ids)
def find_retryable_container_registry_ids(batch_size:)
if Geo::ContainerRepositoryRegistry.registry_consistency_worker_enabled?
registry_finder
.find_retryable_dirty_registries(batch_size: batch_size, except_ids: scheduled_repository_ids)
.pluck_model_foreign_key
else
registry_finder
.find_retryable_failed_ids(batch_size: batch_size, except_ids: scheduled_repository_ids)
end
end
def scheduled_repository_ids
scheduled_jobs.map { |data| data[:id] }
def registry_finder
@registry_finder ||= Geo::ContainerRepositoryRegistryFinder.new(current_node_id: current_node.id)
end
end
end
......@@ -16,6 +16,7 @@ module Geo
feature_category :geo_replication
REGISTRY_CLASSES = [
Geo::ContainerRepositoryRegistry,
Geo::DesignRegistry,
Geo::JobArtifactRegistry,
Geo::LfsObjectRegistry,
......
# frozen_string_literal: true
FactoryBot.define do
factory :container_repository_registry, class: 'Geo::ContainerRepositoryRegistry' do
factory :geo_container_repository_registry, aliases: [:container_repository_registry], class: 'Geo::ContainerRepositoryRegistry' do
container_repository
last_sync_failure { nil }
last_synced_at { nil }
......
......@@ -14,22 +14,6 @@ RSpec.describe Geo::ContainerRepositoryRegistry, :geo do
it { is_expected.to belong_to(:container_repository) }
end
describe 'scopes' do
describe '.repository_id_not_in' do
it 'returns registries scoped by ids' do
registry1 = create(:container_repository_registry)
registry2 = create(:container_repository_registry)
container_repository1_id = registry1.container_repository_id
container_repository2_id = registry2.container_repository_id
result = described_class.repository_id_not_in([container_repository1_id, container_repository2_id])
expect(result).to match_ids([registry])
end
end
end
it_behaves_like 'a Geo registry' do
let(:registry) { create(:container_repository_registry) }
end
......
......@@ -759,6 +759,60 @@ RSpec.describe GeoNode, :request_store, :geo, type: :model do
end
end
describe '#container_repositories' do
let_it_be(:synced_group) { create(:group) }
let_it_be(:nested_group) { create(:group, parent: synced_group) }
let_it_be(:synced_project) { create(:project, group: synced_group) }
let_it_be(:synced_project_in_nested_group) { create(:project, group: nested_group) }
let_it_be(:unsynced_project) { create(:project) }
let_it_be(:project_broken_storage) { create(:project, :broken_storage) }
let_it_be(:container_repository_1) { create(:container_repository, project: synced_project) }
let_it_be(:container_repository_2) { create(:container_repository, project: synced_project_in_nested_group) }
let_it_be(:container_repository_3) { create(:container_repository, project: unsynced_project) }
let_it_be(:container_repository_4) { create(:container_repository, project: project_broken_storage) }
before do
stub_registry_replication_config(enabled: true)
end
context 'with registry replication disabled' do
before do
stub_registry_replication_config(enabled: false)
end
it 'returns an empty relation' do
expect(node.container_repositories).to be_empty
end
end
context 'without selective sync' do
it 'returns all container repositories' do
expect(node.container_repositories).to match_array([container_repository_1, container_repository_2, container_repository_3, container_repository_4])
end
end
context 'with selective sync by namespace' do
before do
node.update!(selective_sync_type: 'namespaces', namespaces: [synced_group])
end
it 'excludes container repositories that are not in selectively synced projects' do
expect(node.container_repositories).to match_array([container_repository_1, container_repository_2])
end
end
context 'with selective sync by shard' do
before do
node.update!(selective_sync_type: 'shards', selective_sync_shards: ['broken'])
end
it 'excludes container repositories that are not in selectively synced shards' do
expect(node.container_repositories).to match_array([container_repository_4])
end
end
end
describe '#job_artifacts' do
context 'when selective sync is enabled' do
it 'applies a CTE statement' do
......
......@@ -9,6 +9,7 @@ RSpec.describe Geo::RegistryConsistencyService, :geo, :use_clean_rails_memory_st
before do
stub_current_geo_node(secondary)
stub_registry_replication_config(enabled: true)
end
def model_class_factory_name(registry_class)
......
......@@ -2,7 +2,7 @@
require 'spec_helper'
RSpec.describe Geo::ContainerRepositorySyncDispatchWorker, :geo, :geo_fdw, :use_sql_query_cache_for_tracking_db do
RSpec.describe Geo::ContainerRepositorySyncDispatchWorker, :geo, :use_sql_query_cache_for_tracking_db do
include ::EE::GeoHelpers
include ExclusiveLeaseHelpers
......@@ -12,10 +12,11 @@ RSpec.describe Geo::ContainerRepositorySyncDispatchWorker, :geo, :geo_fdw, :use_
before do
stub_current_geo_node(secondary)
stub_exclusive_lease(renew: true)
stub_registry_replication_config(enabled: true)
allow_next_instance_of(described_class) do |instance|
allow(instance).to receive(:over_time?).and_return(false)
end
stub_registry_replication_config(enabled: true)
end
it 'does not schedule anything when tracking database is not configured' do
......@@ -33,30 +34,39 @@ RSpec.describe Geo::ContainerRepositorySyncDispatchWorker, :geo, :geo_fdw, :use_
end
it 'does not schedule anything when node is disabled' do
secondary.update!(enabled: false)
create(:container_repository)
secondary.enabled = false
secondary.save
expect(Geo::ContainerRepositorySyncWorker).not_to receive(:perform_async)
subject.perform
end
context 'Sync condition' do
let(:container_repository) { create(:container_repository) }
it 'does not schedule anything when registry replication is disabled' do
stub_registry_replication_config(enabled: false)
create(:container_repository)
expect(Geo::ContainerRepositorySyncWorker).not_to receive(:perform_async)
end
context 'when geo_container_registry_ssot_sync is disabled', :geo_fdw do
before do
stub_feature_flags(geo_container_registry_ssot_sync: false)
end
it 'performs Geo::ContainerRepositorySyncWorker' do
container_repository = create(:container_repository)
expect(Geo::ContainerRepositorySyncWorker).to receive(:perform_async).with(container_repository.id)
subject.perform
end
it 'performs Geo::ContainerRepositorySyncWorker for failed syncs' do
container_repository_registry = create(:container_repository_registry, :sync_failed)
registry = create(:container_repository_registry, :sync_failed)
expect(Geo::ContainerRepositorySyncWorker).to receive(:perform_async)
.with(container_repository_registry.container_repository_id).once.and_return(spy)
.with(registry.container_repository_id).once.and_return(spy)
subject.perform
end
......@@ -70,15 +80,14 @@ RSpec.describe Geo::ContainerRepositorySyncDispatchWorker, :geo, :geo_fdw, :use_
end
context 'with a failed sync' do
let(:failed_registry) { create(:container_repository_registry, :sync_failed) }
it 'does not stall backfill' do
unsynced = create(:container_repository)
failed_registry = create(:container_repository_registry, :sync_failed)
unsynced_container_repository = create(:container_repository)
stub_const('Geo::Scheduler::SchedulerWorker::DB_RETRIEVE_BATCH_SIZE', 1)
expect(Geo::ContainerRepositorySyncWorker).not_to receive(:perform_async).with(failed_registry.container_repository_id)
expect(Geo::ContainerRepositorySyncWorker).to receive(:perform_async).with(unsynced.id)
expect(Geo::ContainerRepositorySyncWorker).to receive(:perform_async).with(unsynced_container_repository.id)
subject.perform
end
......@@ -101,7 +110,6 @@ RSpec.describe Geo::ContainerRepositorySyncDispatchWorker, :geo, :geo_fdw, :use_
subject.perform
end
end
end
context 'when node has namespace restrictions', :request_store do
let(:synced_group) { create(:group) }
......@@ -110,14 +118,9 @@ RSpec.describe Geo::ContainerRepositorySyncDispatchWorker, :geo, :geo_fdw, :use_
before do
secondary.update!(selective_sync_type: 'namespaces', namespaces: [synced_group])
allow(ProjectCacheWorker).to receive(:perform_async).and_return(true)
allow(::Gitlab::Geo).to receive(:current_node).and_call_original
Rails.cache.write(:current_node, secondary.to_json)
allow(::GeoNode).to receive(:current_node).and_return(secondary)
end
it 'does not perform Geo::ContainerRepositorySyncWorker for repositories that does not belong to selected namespaces ' do
it 'does not perform Geo::ContainerRepositorySyncWorker for repositories that does not belong to selected namespaces' do
container_repository = create(:container_repository, project: project_in_synced_group)
create(:container_repository, project: unsynced_project)
......@@ -127,4 +130,68 @@ RSpec.describe Geo::ContainerRepositorySyncDispatchWorker, :geo, :geo_fdw, :use_
subject.perform
end
end
end
context 'when geo_container_registry_ssot_sync is enabled' do
before do
stub_feature_flags(geo_container_registry_ssot_sync: true)
end
it 'performs Geo::ContainerRepositorySyncWorker' do
registry = create(:container_repository_registry)
expect(Geo::ContainerRepositorySyncWorker).to receive(:perform_async).with(registry.container_repository_id)
subject.perform
end
it 'performs Geo::ContainerRepositorySyncWorker for failed syncs' do
registry = create(:container_repository_registry, :sync_failed)
expect(Geo::ContainerRepositorySyncWorker).to receive(:perform_async)
.with(registry.container_repository_id).once.and_return(spy)
subject.perform
end
it 'does not perform Geo::ContainerRepositorySyncWorker for synced repositories' do
create(:container_repository_registry, :synced)
expect(Geo::ContainerRepositorySyncWorker).not_to receive(:perform_async)
subject.perform
end
context 'with a failed sync' do
it 'does not stall backfill' do
failed_registry = create(:container_repository_registry, :sync_failed)
unsynced_registry = create(:container_repository_registry)
stub_const('Geo::Scheduler::SchedulerWorker::DB_RETRIEVE_BATCH_SIZE', 1)
expect(Geo::ContainerRepositorySyncWorker).not_to receive(:perform_async).with(failed_registry.container_repository_id)
expect(Geo::ContainerRepositorySyncWorker).to receive(:perform_async).with(unsynced_registry.container_repository_id)
subject.perform
end
it 'does not retry failed files when retry_at is tomorrow' do
failed_registry = create(:container_repository_registry, :sync_failed, retry_at: Date.tomorrow)
expect(Geo::ContainerRepositorySyncWorker)
.not_to receive(:perform_async).with( failed_registry.container_repository_id)
subject.perform
end
it 'retries failed files when retry_at is in the past' do
failed_registry = create(:container_repository_registry, :sync_failed, retry_at: Date.yesterday)
expect(Geo::ContainerRepositorySyncWorker)
.to receive(:perform_async).with(failed_registry.container_repository_id)
subject.perform
end
end
end
end
......@@ -2,20 +2,21 @@
require 'spec_helper'
RSpec.describe Geo::Secondary::RegistryConsistencyWorker, :geo, :geo_fdw do
RSpec.describe Geo::Secondary::RegistryConsistencyWorker, :geo do
include EE::GeoHelpers
include ExclusiveLeaseHelpers
let(:primary) { create(:geo_node, :primary) }
let(:secondary) { create(:geo_node) }
let_it_be(:primary) { create(:geo_node, :primary) }
let_it_be(:secondary) { create(:geo_node) }
let(:worker_class) { described_class }
let(:batch_size) { described_class::BATCH_SIZE }
before do
stub_current_geo_node(secondary)
stub_registry_replication_config(enabled: true)
end
let(:worker_class) { described_class }
let(:batch_size) { described_class::BATCH_SIZE }
it_behaves_like 'reenqueuer'
it 'uses a cronjob queue' do
......@@ -84,6 +85,7 @@ RSpec.describe Geo::Secondary::RegistryConsistencyWorker, :geo, :geo_fdw do
upload = create(:upload)
package_file = create(:conan_package_file, :conan_package)
vulnerability_export = create(:vulnerability_export, :with_csv_file)
container_repository = create(:container_repository, project: project)
expect(Geo::LfsObjectRegistry.where(lfs_object_id: lfs_object.id).count).to eq(0)
expect(Geo::JobArtifactRegistry.where(artifact_id: job_artifact.id).count).to eq(0)
......@@ -92,6 +94,7 @@ RSpec.describe Geo::Secondary::RegistryConsistencyWorker, :geo, :geo_fdw do
expect(Geo::UploadRegistry.where(file_id: upload.id).count).to eq(0)
expect(Geo::PackageFileRegistry.where(package_file_id: package_file.id).count).to eq(0)
expect(Geo::VulnerabilityExportRegistry.where(vulnerability_export_id: vulnerability_export.id).count).to eq(0)
expect(Geo::ContainerRepositoryRegistry.where(container_repository_id: container_repository.id).count).to eq(0)
subject.perform
......@@ -102,6 +105,7 @@ RSpec.describe Geo::Secondary::RegistryConsistencyWorker, :geo, :geo_fdw do
expect(Geo::UploadRegistry.where(file_id: upload.id).count).to eq(1)
expect(Geo::PackageFileRegistry.where(package_file_id: package_file.id).count).to eq(1)
expect(Geo::VulnerabilityExportRegistry.where(vulnerability_export_id: vulnerability_export.id).count).to eq(1)
expect(Geo::ContainerRepositoryRegistry.where(container_repository_id: container_repository.id).count).to eq(1)
end
context 'when geo_design_registry_ssot_sync is disabled' do
......@@ -120,6 +124,7 @@ RSpec.describe Geo::Secondary::RegistryConsistencyWorker, :geo, :geo_fdw do
allow(Geo::RegistryConsistencyService).to receive(:new).with(Geo::UploadRegistry, batch_size: batch_size).and_call_original
allow(Geo::RegistryConsistencyService).to receive(:new).with(Geo::ProjectRegistry, batch_size: batch_size).and_call_original
allow(Geo::RegistryConsistencyService).to receive(:new).with(Geo::VulnerabilityExportRegistry, batch_size: batch_size).and_call_original
allow(Geo::RegistryConsistencyService).to receive(:new).with(Geo::ContainerRepositoryRegistry, batch_size: batch_size).and_call_original
expect(Geo::RegistryConsistencyService).not_to receive(:new).with(Geo::DesignRegistry, batch_size: batch_size)
......@@ -127,6 +132,30 @@ RSpec.describe Geo::Secondary::RegistryConsistencyWorker, :geo, :geo_fdw do
end
end
context 'when geo_container_registry_ssot_sync is disabled' do
before do
stub_feature_flags(geo_container_registry_ssot_sync: false)
end
it 'returns false' do
expect(subject.perform).to be_falsey
end
it 'does not execute RegistryConsistencyService for container repositories' do
allow(Geo::RegistryConsistencyService).to receive(:new).with(Geo::JobArtifactRegistry, batch_size: batch_size).and_call_original
allow(Geo::RegistryConsistencyService).to receive(:new).with(Geo::LfsObjectRegistry, batch_size: batch_size).and_call_original
allow(Geo::RegistryConsistencyService).to receive(:new).with(Geo::PackageFileRegistry, batch_size: batch_size).and_call_original
allow(Geo::RegistryConsistencyService).to receive(:new).with(Geo::UploadRegistry, batch_size: batch_size).and_call_original
allow(Geo::RegistryConsistencyService).to receive(:new).with(Geo::ProjectRegistry, batch_size: batch_size).and_call_original
allow(Geo::RegistryConsistencyService).to receive(:new).with(Geo::DesignRegistry, batch_size: batch_size).and_call_original
allow(Geo::RegistryConsistencyService).to receive(:new).with(Geo::VulnerabilityExportRegistry, batch_size: batch_size).and_call_original
expect(Geo::RegistryConsistencyService).not_to receive(:new).with(Geo::ContainerRepositoryRegistry, batch_size: batch_size)
subject.perform
end
end
context 'when the current Geo node is disabled or primary' do
before do
stub_primary_node
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment