Add methods to find container registries to sync

parent d47f531a
...@@ -30,6 +30,42 @@ module Geo ...@@ -30,6 +30,42 @@ module Geo
[untracked_ids, unused_tracked_ids] [untracked_ids, unused_tracked_ids]
end end
# Returns Geo::ContainerRepositoryRegistry records that have never been synced.
#
# Does not care about selective sync, because it considers the Registry
# table to be the single source of truth. The contract is that other
# processes need to ensure that the table only contains records that should
# be synced.
#
# Any registries that have ever been synced that currently need to be
# resynced will be handled by other find methods (like
# #find_retryable_dirty_registries)
#
# You can pass a list with `except_ids:` so you can exclude items you
# already scheduled but haven't finished and aren't persisted to the database yet
#
# @param [Integer] batch_size used to limit the results returned
# @param [Array<Integer>] except_ids ids that will be ignored from the query
# rubocop:disable CodeReuse/ActiveRecord
def find_never_synced_registries(batch_size:, except_ids: [])
Geo::ContainerRepositoryRegistry
.never_synced
.model_id_not_in(except_ids)
.limit(batch_size)
end
# rubocop:enable CodeReuse/ActiveRecord
# rubocop:disable CodeReuse/ActiveRecord
def find_retryable_dirty_registries(batch_size:, except_ids: [])
Geo::ContainerRepositoryRegistry
.failed
.retry_due
.model_id_not_in(except_ids)
.order(Gitlab::Database.nulls_first_order(:last_synced_at))
.limit(batch_size)
end
# rubocop:enable CodeReuse/ActiveRecord
# Find limited amount of non replicated container repositories. # Find limited amount of non replicated container repositories.
# #
# You can pass a list with `except_repository_ids:` so you can exclude items you # You can pass a list with `except_repository_ids:` so you can exclude items you
...@@ -37,16 +73,16 @@ module Geo ...@@ -37,16 +73,16 @@ module Geo
# #
# @param [Integer] batch_size used to limit the results returned # @param [Integer] batch_size used to limit the results returned
# @param [Array<Integer>] except_repository_ids ids that will be ignored from the query # @param [Array<Integer>] except_repository_ids ids that will be ignored from the query
# rubocop: disable CodeReuse/ActiveRecord # rubocop:disable CodeReuse/ActiveRecord
def find_unsynced(batch_size:, except_repository_ids: []) def find_unsynced(batch_size:, except_repository_ids: [])
container_repositories container_repositories
.missing_container_repository_registry .missing_container_repository_registry
.id_not_in(except_repository_ids) .id_not_in(except_repository_ids)
.limit(batch_size) .limit(batch_size)
end end
# rubocop: enable CodeReuse/ActiveRecord # rubocop:enable CodeReuse/ActiveRecord
# rubocop: disable CodeReuse/ActiveRecord # rubocop:disable CodeReuse/ActiveRecord
def find_retryable_failed_ids(batch_size:, except_repository_ids: []) def find_retryable_failed_ids(batch_size:, except_repository_ids: [])
Geo::ContainerRepositoryRegistry Geo::ContainerRepositoryRegistry
.failed .failed
...@@ -55,7 +91,7 @@ module Geo ...@@ -55,7 +91,7 @@ module Geo
.limit(batch_size) .limit(batch_size)
.pluck_container_repository_key .pluck_container_repository_key
end end
# rubocop: enable CodeReuse/ActiveRecord # rubocop:enable CodeReuse/ActiveRecord
private private
......
...@@ -8,6 +8,7 @@ class Geo::ContainerRepositoryRegistry < Geo::BaseRegistry ...@@ -8,6 +8,7 @@ class Geo::ContainerRepositoryRegistry < Geo::BaseRegistry
belongs_to :container_repository belongs_to :container_repository
scope :never_synced, -> { with_state(:pending).where(last_synced_at: nil) }
scope :failed, -> { with_state(:failed) } scope :failed, -> { with_state(:failed) }
scope :synced, -> { with_state(:synced) } scope :synced, -> { with_state(:synced) }
scope :retry_due, -> { where(arel_table[:retry_at].eq(nil).or(arel_table[:retry_at].lt(Time.current))) } scope :retry_due, -> { where(arel_table[:retry_at].eq(nil).or(arel_table[:retry_at].lt(Time.current))) }
......
...@@ -129,7 +129,7 @@ RSpec.describe Geo::ContainerRepositoryRegistryFinder, :geo do ...@@ -129,7 +129,7 @@ RSpec.describe Geo::ContainerRepositoryRegistryFinder, :geo do
end end
end end
describe '#find_registry_differences' do context 'non-fdw queries' do
let_it_be(:secondary) { create(:geo_node) } let_it_be(:secondary) { create(:geo_node) }
let_it_be(:synced_group) { create(:group) } let_it_be(:synced_group) { create(:group) }
let_it_be(:nested_group) { create(:group, parent: synced_group) } let_it_be(:nested_group) { create(:group, parent: synced_group) }
...@@ -143,140 +143,186 @@ RSpec.describe Geo::ContainerRepositoryRegistryFinder, :geo do ...@@ -143,140 +143,186 @@ RSpec.describe Geo::ContainerRepositoryRegistryFinder, :geo do
let_it_be(:container_repository_5) { create(:container_repository, project: project_broken_storage) } let_it_be(:container_repository_5) { create(:container_repository, project: project_broken_storage) }
let_it_be(:container_repository_6) { create(:container_repository, project: project_broken_storage) } let_it_be(:container_repository_6) { create(:container_repository, project: project_broken_storage) }
subject { described_class.new(current_node_id: secondary.id) }
before do before do
stub_current_geo_node(secondary) stub_current_geo_node(secondary)
end end
context 'untracked IDs' do describe '#find_registry_differences' do
before do context 'untracked IDs' do
create(:container_repository_registry, container_repository_id: container_repository_1.id) before do
create(:container_repository_registry, :sync_failed, container_repository_id: container_repository_3.id) create(:container_repository_registry, container_repository_id: container_repository_1.id)
create(:container_repository_registry, container_repository_id: container_repository_5.id) create(:container_repository_registry, :sync_failed, container_repository_id: container_repository_3.id)
end create(:container_repository_registry, container_repository_id: container_repository_5.id)
end
it 'includes container registries IDs without an entry on the tracking database' do it 'includes container registries IDs without an entry on the tracking database' do
range = ContainerRepository.minimum(:id)..ContainerRepository.maximum(:id) range = ContainerRepository.minimum(:id)..ContainerRepository.maximum(:id)
untracked_ids, _ = subject.find_registry_differences(range) untracked_ids, _ = subject.find_registry_differences(range)
expect(untracked_ids).to match_array([container_repository_2.id, container_repository_4.id, container_repository_6.id]) expect(untracked_ids).to match_array([container_repository_2.id, container_repository_4.id, container_repository_6.id])
end end
it 'excludes container registries outside the ID range' do it 'excludes container registries outside the ID range' do
untracked_ids, _ = subject.find_registry_differences(container_repository_4.id..container_repository_6.id) untracked_ids, _ = subject.find_registry_differences(container_repository_4.id..container_repository_6.id)
expect(untracked_ids).to match_array([container_repository_4.id, container_repository_6.id]) expect(untracked_ids).to match_array([container_repository_4.id, container_repository_6.id])
end end
context 'with selective sync by namespace' do context 'with selective sync by namespace' do
let(:secondary) { create(:geo_node, selective_sync_type: 'namespaces', namespaces: [synced_group]) } let(:secondary) { create(:geo_node, selective_sync_type: 'namespaces', namespaces: [synced_group]) }
it 'excludes container_registry IDs that projects are not in the selected namespaces' do it 'excludes container_registry IDs that projects are not in the selected namespaces' do
range = ContainerRepository.minimum(:id)..ContainerRepository.maximum(:id) range = ContainerRepository.minimum(:id)..ContainerRepository.maximum(:id)
untracked_ids, _ = subject.find_registry_differences(range) untracked_ids, _ = subject.find_registry_differences(range)
expect(untracked_ids).to match_array([container_repository_2.id]) expect(untracked_ids).to match_array([container_repository_2.id])
end
end end
end
context 'with selective sync by shard' do context 'with selective sync by shard' do
let(:secondary) { create(:geo_node, selective_sync_type: 'shards', selective_sync_shards: ['broken']) } let(:secondary) { create(:geo_node, selective_sync_type: 'shards', selective_sync_shards: ['broken']) }
it 'excludes container_registry IDs that projects are not in the selected shards' do it 'excludes container_registry IDs that projects are not in the selected shards' do
range = ContainerRepository.minimum(:id)..ContainerRepository.maximum(:id) range = ContainerRepository.minimum(:id)..ContainerRepository.maximum(:id)
untracked_ids, _ = subject.find_registry_differences(range) untracked_ids, _ = subject.find_registry_differences(range)
expect(untracked_ids).to match_array([container_repository_6.id]) expect(untracked_ids).to match_array([container_repository_6.id])
end
end end
end end
end
context 'unused tracked IDs' do context 'unused tracked IDs' do
context 'with an orphaned registry' do context 'with an orphaned registry' do
let!(:orphaned) { create(:container_repository_registry, container_repository_id: container_repository_1.id) } let!(:orphaned) { create(:container_repository_registry, container_repository_id: container_repository_1.id) }
before do before do
container_repository_1.delete container_repository_1.delete
end end
it 'includes tracked IDs that do not exist in the model table' do it 'includes tracked IDs that do not exist in the model table' do
range = container_repository_1.id..container_repository_1.id range = container_repository_1.id..container_repository_1.id
_, unused_tracked_ids = subject.find_registry_differences(range) _, unused_tracked_ids = subject.find_registry_differences(range)
expect(unused_tracked_ids).to match_array([container_repository_1.id]) expect(unused_tracked_ids).to match_array([container_repository_1.id])
end end
it 'excludes IDs outside the ID range' do it 'excludes IDs outside the ID range' do
range = (container_repository_1.id + 1)..ContainerRepository.maximum(:id) range = (container_repository_1.id + 1)..ContainerRepository.maximum(:id)
_, unused_tracked_ids = subject.find_registry_differences(range) _, unused_tracked_ids = subject.find_registry_differences(range)
expect(unused_tracked_ids).to be_empty expect(unused_tracked_ids).to be_empty
end
end end
end
context 'with selective sync by namespace' do context 'with selective sync by namespace' do
let(:secondary) { create(:geo_node, selective_sync_type: 'namespaces', namespaces: [synced_group]) } let(:secondary) { create(:geo_node, selective_sync_type: 'namespaces', namespaces: [synced_group]) }
context 'with a tracked container_registry' do context 'with a tracked container_registry' do
context 'excluded from selective sync' do context 'excluded from selective sync' do
let!(:registry_entry) { create(:container_repository_registry, container_repository_id: container_repository_3.id) } let!(:registry_entry) { create(:container_repository_registry, container_repository_id: container_repository_3.id) }
it 'includes tracked container_registry IDs that exist but are not in a selectively synced project' do it 'includes tracked container_registry IDs that exist but are not in a selectively synced project' do
range = container_repository_3.id..container_repository_3.id range = container_repository_3.id..container_repository_3.id
_, unused_tracked_ids = subject.find_registry_differences(range) _, unused_tracked_ids = subject.find_registry_differences(range)
expect(unused_tracked_ids).to match_array([container_repository_3.id]) expect(unused_tracked_ids).to match_array([container_repository_3.id])
end
end end
end
context 'included in selective sync' do context 'included in selective sync' do
let!(:registry_entry) { create(:container_repository_registry, container_repository_id: container_repository_1.id) } let!(:registry_entry) { create(:container_repository_registry, container_repository_id: container_repository_1.id) }
it 'excludes tracked container_registry IDs that are in selectively synced projects' do it 'excludes tracked container_registry IDs that are in selectively synced projects' do
range = container_repository_1.id..container_repository_1.id range = container_repository_1.id..container_repository_1.id
_, unused_tracked_ids = subject.find_registry_differences(range) _, unused_tracked_ids = subject.find_registry_differences(range)
expect(unused_tracked_ids).to be_empty expect(unused_tracked_ids).to be_empty
end
end end
end end
end end
end
context 'with selective sync by shard' do context 'with selective sync by shard' do
let(:secondary) { create(:geo_node, selective_sync_type: 'shards', selective_sync_shards: ['broken']) } let(:secondary) { create(:geo_node, selective_sync_type: 'shards', selective_sync_shards: ['broken']) }
context 'with a tracked container_registry' do context 'with a tracked container_registry' do
let!(:registry_entry) { create(:container_repository_registry, container_repository_id: container_repository_1.id) } let!(:registry_entry) { create(:container_repository_registry, container_repository_id: container_repository_1.id) }
context 'excluded from selective sync' do context 'excluded from selective sync' do
it 'includes tracked container_registry IDs that exist but are not in a selectively synced project' do it 'includes tracked container_registry IDs that exist but are not in a selectively synced project' do
range = container_repository_1.id..container_repository_1.id range = container_repository_1.id..container_repository_1.id
_, unused_tracked_ids = subject.find_registry_differences(range) _, unused_tracked_ids = subject.find_registry_differences(range)
expect(unused_tracked_ids).to match_array([container_repository_1.id]) expect(unused_tracked_ids).to match_array([container_repository_1.id])
end
end end
end
context 'included in selective sync' do context 'included in selective sync' do
let!(:registry_entry) { create(:container_repository_registry, container_repository_id: container_repository_5.id) } let!(:registry_entry) { create(:container_repository_registry, container_repository_id: container_repository_5.id) }
it 'excludes tracked container_registry IDs that are in selectively synced projects' do it 'excludes tracked container_registry IDs that are in selectively synced projects' do
range = container_repository_5.id..container_repository_5.id range = container_repository_5.id..container_repository_5.id
_, unused_tracked_ids = subject.find_registry_differences(range) _, unused_tracked_ids = subject.find_registry_differences(range)
expect(unused_tracked_ids).to be_empty expect(unused_tracked_ids).to be_empty
end
end end
end end
end end
end end
end end
describe '#find_never_synced_registries' do
let!(:registry_container_registry_1) { create(:container_repository_registry, :synced, container_repository_id: container_repository_1.id) }
let!(:registry_container_registry_2) { create(:container_repository_registry, :sync_failed, container_repository_id: container_repository_2.id) }
let!(:registry_container_registry_3) { create(:container_repository_registry, container_repository_id: container_repository_3.id, last_synced_at: nil) }
let!(:registry_container_registry_4) { create(:container_repository_registry, container_repository_id: container_repository_4.id, last_synced_at: 3.days.ago, retry_at: 2.days.ago) }
let!(:registry_container_registry_5) { create(:container_repository_registry, container_repository_id: container_repository_5.id, last_synced_at: 6.days.ago) }
let!(:registry_container_registry_6) { create(:container_repository_registry, container_repository_id: container_repository_6.id, last_synced_at: nil) }
it 'returns registries for projects that have never been synced' do
registries = subject.find_never_synced_registries(batch_size: 10)
expect(registries).to match_ids(registry_container_registry_3, registry_container_registry_6)
end
it 'excludes except_ids' do
registries = subject.find_never_synced_registries(batch_size: 10, except_ids: [container_repository_3.id])
expect(registries).to match_ids(registry_container_registry_6)
end
end
describe '#find_retryable_dirty_registries' do
let!(:registry_container_registry_1) { create(:container_repository_registry, :synced, container_repository_id: container_repository_1.id) }
let!(:registry_container_registry_2) { create(:container_repository_registry, :sync_started, container_repository_id: container_repository_2.id) }
let!(:registry_container_registry_3) { create(:container_repository_registry, state: :failed, container_repository_id: container_repository_3.id, last_synced_at: nil) }
let!(:registry_container_registry_4) { create(:container_repository_registry, state: :failed, container_repository_id: container_repository_4.id, last_synced_at: 3.days.ago, retry_at: 2.days.ago) }
let!(:registry_container_registry_5) { create(:container_repository_registry, state: :failed, container_repository_id: container_repository_5.id, last_synced_at: 6.days.ago) }
let!(:registry_container_registry_6) { create(:container_repository_registry, state: :failed, container_repository_id: container_repository_6.id, last_synced_at: nil) }
it 'returns registries for projects that have been recently updated' do
registries = subject.find_retryable_dirty_registries(batch_size: 10)
expect(registries).to match_ids(registry_container_registry_3, registry_container_registry_4, registry_container_registry_5, registry_container_registry_6)
end
it 'excludes except_ids' do
registries = subject.find_retryable_dirty_registries(batch_size: 10, except_ids: [container_repository_4.id, container_repository_5.id, container_repository_6.id])
expect(registries).to match_ids(registry_container_registry_3)
end
end
end end
end end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment