Add selective sync support for the FDW queries to count job artifacts

parent cbe5d29b
...@@ -7,31 +7,19 @@ module Geo ...@@ -7,31 +7,19 @@ module Geo
end end
def count_synced def count_synced
if aggregate_pushdown_supported? job_artifacts_synced.count
find_synced.count
else
legacy_find_synced.count
end
end end
def count_failed def count_failed
if aggregate_pushdown_supported? job_artifacts_failed.count
find_failed.count
else
legacy_find_failed.count
end
end end
def count_synced_missing_on_primary def count_synced_missing_on_primary
if aggregate_pushdown_supported? job_artifacts_synced_missing_on_primary.count
find_synced_missing_on_primary.count
else
legacy_find_synced_missing_on_primary.count
end
end end
def count_registry def count_registry
Geo::JobArtifactRegistry.count registries_for_job_artifacts.count
end end
def syncable def syncable
...@@ -82,7 +70,8 @@ module Geo ...@@ -82,7 +70,8 @@ module Geo
# rubocop: disable CodeReuse/ActiveRecord # rubocop: disable CodeReuse/ActiveRecord
def find_retryable_failed_registries(batch_size:, except_artifact_ids: []) def find_retryable_failed_registries(batch_size:, except_artifact_ids: [])
find_failed_registries Geo::JobArtifactRegistry
.failed
.retry_due .retry_due
.artifact_id_not_in(except_artifact_ids) .artifact_id_not_in(except_artifact_ids)
.limit(batch_size) .limit(batch_size)
...@@ -91,7 +80,9 @@ module Geo ...@@ -91,7 +80,9 @@ module Geo
# rubocop: disable CodeReuse/ActiveRecord # rubocop: disable CodeReuse/ActiveRecord
def find_retryable_synced_missing_on_primary_registries(batch_size:, except_artifact_ids: []) def find_retryable_synced_missing_on_primary_registries(batch_size:, except_artifact_ids: [])
find_synced_missing_on_primary_registries Geo::JobArtifactRegistry
.synced
.missing_on_primary
.retry_due .retry_due
.artifact_id_not_in(except_artifact_ids) .artifact_id_not_in(except_artifact_ids)
.limit(batch_size) .limit(batch_size)
...@@ -118,46 +109,38 @@ module Geo ...@@ -118,46 +109,38 @@ module Geo
end end
end end
def find_synced def registries_for_job_artifacts
if use_legacy_queries? if use_legacy_queries_for_selective_sync?
legacy_find_synced legacy_finder.registries_for_job_artifacts
else else
fdw_find.merge(find_synced_registries) job_artifacts
.inner_join_job_artifact_registry
.syncable
end end
end end
def find_synced_missing_on_primary def job_artifacts_synced
if use_legacy_queries? if use_legacy_queries_for_selective_sync?
legacy_find_synced_missing_on_primary legacy_finder.job_artifacts_synced
else else
fdw_find.merge(find_synced_missing_on_primary_registries) registries_for_job_artifacts.merge(Geo::JobArtifactRegistry.synced)
end end
end end
def find_failed def job_artifacts_failed
if use_legacy_queries? if use_legacy_queries_for_selective_sync?
legacy_find_failed legacy_finder.job_artifacts_failed
else else
fdw_find.merge(find_failed_registries) registries_for_job_artifacts.merge(Geo::JobArtifactRegistry.failed)
end
end end
def find_synced_registries
Geo::JobArtifactRegistry.synced
end end
def find_synced_missing_on_primary_registries def job_artifacts_synced_missing_on_primary
find_synced_registries.missing_on_primary if use_legacy_queries_for_selective_sync?
legacy_finder.job_artifacts_synced_missing_on_primary
else
registries_for_job_artifacts.merge(Geo::JobArtifactRegistry.synced.missing_on_primary)
end end
def find_failed_registries
Geo::JobArtifactRegistry.failed
end
def fdw_find
job_artifacts
.inner_join_job_artifact_registry
.syncable
end end
def fdw_find_unsynced(except_artifact_ids:) def fdw_find_unsynced(except_artifact_ids:)
...@@ -175,22 +158,6 @@ module Geo ...@@ -175,22 +158,6 @@ module Geo
.merge(Geo::JobArtifactRegistry.all) .merge(Geo::JobArtifactRegistry.all)
end end
def legacy_find_synced
legacy_inner_join_registry_ids(
syncable,
find_synced_registries.pluck_artifact_key,
Ci::JobArtifact
)
end
def legacy_find_failed
legacy_inner_join_registry_ids(
syncable,
find_failed_registries.pluck_artifact_key,
Ci::JobArtifact
)
end
def legacy_find_unsynced(except_artifact_ids:) def legacy_find_unsynced(except_artifact_ids:)
registry_artifact_ids = Geo::JobArtifactRegistry.pluck_artifact_key | except_artifact_ids registry_artifact_ids = Geo::JobArtifactRegistry.pluck_artifact_key | except_artifact_ids
...@@ -210,13 +177,5 @@ module Geo ...@@ -210,13 +177,5 @@ module Geo
Ci::JobArtifact Ci::JobArtifact
) )
end end
def legacy_find_synced_missing_on_primary
legacy_inner_join_registry_ids(
syncable,
find_synced_missing_on_primary_registries.pluck_artifact_key,
Ci::JobArtifact
)
end
end end
end end
...@@ -13,5 +13,40 @@ module Geo ...@@ -13,5 +13,40 @@ module Geo
Ci::JobArtifact.all Ci::JobArtifact.all
end end
end end
def job_artifacts_synced
legacy_inner_join_registry_ids(
syncable,
Geo::JobArtifactRegistry.synced.pluck_artifact_key,
Ci::JobArtifact
)
end
def job_artifacts_failed
legacy_inner_join_registry_ids(
syncable,
Geo::JobArtifactRegistry.failed.pluck_artifact_key,
Ci::JobArtifact
)
end
def job_artifacts_synced_missing_on_primary
legacy_inner_join_registry_ids(
syncable,
Geo::JobArtifactRegistry.synced.missing_on_primary.pluck_artifact_key,
Ci::JobArtifact
)
end
def registries_for_job_artifacts
return Geo::JobArtifactRegistry.all unless selective_sync?
legacy_inner_join_registry_ids(
Geo::JobArtifactRegistry.all,
job_artifacts.pluck_primary_key,
Geo::JobArtifactRegistry,
foreign_key: :artifact_id
)
end
end end
end end
...@@ -11,6 +11,7 @@ describe Geo::JobArtifactRegistryFinder, :geo do ...@@ -11,6 +11,7 @@ describe Geo::JobArtifactRegistryFinder, :geo do
let(:synced_group) { create(:group) } let(:synced_group) { create(:group) }
let(:synced_project) { create(:project, group: synced_group) } let(:synced_project) { create(:project, group: synced_group) }
let(:unsynced_project) { create(:project) } let(:unsynced_project) { create(:project) }
let(:project_broken_storage) { create(:project, :broken_storage) }
let(:job_artifact_1) { create(:ci_job_artifact, project: synced_project) } let(:job_artifact_1) { create(:ci_job_artifact, project: synced_project) }
let(:job_artifact_2) { create(:ci_job_artifact, project: unsynced_project) } let(:job_artifact_2) { create(:ci_job_artifact, project: unsynced_project) }
...@@ -26,32 +27,51 @@ describe Geo::JobArtifactRegistryFinder, :geo do ...@@ -26,32 +27,51 @@ describe Geo::JobArtifactRegistryFinder, :geo do
stub_artifacts_object_storage stub_artifacts_object_storage
end end
it 'responds to file registry finder methods' do
file_registry_finder_methods = %i{
syncable
count_syncable
count_synced
count_failed
count_synced_missing_on_primary
count_registry
find_unsynced
find_migrated_local
find_retryable_failed_registries
find_retryable_synced_missing_on_primary_registries
}
file_registry_finder_methods.each do |method|
expect(subject).to respond_to(method)
end
end
shared_examples 'counts all the things' do shared_examples 'counts all the things' do
describe '#count_syncable' do describe '#count_syncable' do
before do let!(:job_artifact_1) { create(:ci_job_artifact, project: synced_project) }
job_artifact_1 let!(:job_artifact_2) { create(:ci_job_artifact, project: unsynced_project) }
job_artifact_2 let!(:job_artifact_3) { create(:ci_job_artifact, project: synced_project) }
job_artifact_3 let!(:job_artifact_4) { create(:ci_job_artifact, project: unsynced_project) }
job_artifact_4 let!(:job_artifact_5) { create(:ci_job_artifact, project: project_broken_storage) }
end let!(:job_artifact_6) { create(:ci_job_artifact, project: project_broken_storage) }
it 'counts job artifacts' do it 'counts job artifacts' do
expect(subject.count_syncable).to eq 4 expect(subject.count_syncable).to eq 6
end end
it 'ignores remote job artifacts' do it 'ignores remote job artifacts' do
job_artifact_1.update_column(:file_store, ObjectStorage::Store::REMOTE) job_artifact_1.update_column(:file_store, ObjectStorage::Store::REMOTE)
expect(subject.count_syncable).to eq 3 expect(subject.count_syncable).to eq 5
end end
it 'ignores expired job artifacts' do it 'ignores expired job artifacts' do
job_artifact_1.update_column(:expire_at, Date.yesterday) job_artifact_1.update_column(:expire_at, Date.yesterday)
expect(subject.count_syncable).to eq 3 expect(subject.count_syncable).to eq 5
end end
context 'with selective sync' do context 'with selective sync by namespace' do
before do before do
secondary.update!(selective_sync_type: 'namespaces', namespaces: [synced_group]) secondary.update!(selective_sync_type: 'namespaces', namespaces: [synced_group])
end end
...@@ -72,168 +92,198 @@ describe Geo::JobArtifactRegistryFinder, :geo do ...@@ -72,168 +92,198 @@ describe Geo::JobArtifactRegistryFinder, :geo do
expect(subject.count_syncable).to eq 1 expect(subject.count_syncable).to eq 1
end end
end end
context 'with selective sync by shard' do
before do
secondary.update!(selective_sync_type: 'shards', selective_sync_shards: ['broken'])
end end
describe '#count_synced' do it 'counts job artifacts' do
it 'delegates to #legacy_find_synced' do expect(subject.count_syncable).to eq 2
allow(subject).to receive(:aggregate_pushdown_supported?).and_return(false) end
expect(subject).to receive(:legacy_find_synced).and_call_original it 'ignores remote job artifacts' do
job_artifact_5.update_column(:file_store, ObjectStorage::Store::REMOTE)
subject.count_synced expect(subject.count_syncable).to eq 1
end end
it 'delegates to #find_synced for PostgreSQL 10' do it 'ignores expired job artifacts' do
allow(subject).to receive(:aggregate_pushdown_supported?).and_return(true) job_artifact_5.update_column(:expire_at, Date.yesterday)
expect(subject).to receive(:find_synced).and_call_original
subject.count_synced expect(subject.count_syncable).to eq 1
end
end
end end
it 'counts job artifacts that have been synced' do describe '#count_synced' do
let!(:job_artifact_1) { create(:ci_job_artifact, project: synced_project) }
let!(:job_artifact_2) { create(:ci_job_artifact, project: unsynced_project) }
let!(:job_artifact_3) { create(:ci_job_artifact, project: synced_project) }
let!(:job_artifact_4) { create(:ci_job_artifact, project: unsynced_project) }
let!(:job_artifact_5) { create(:ci_job_artifact, project: project_broken_storage) }
let!(:job_artifact_6) { create(:ci_job_artifact, project: project_broken_storage) }
context 'without selective sync' do
before do
create(:geo_job_artifact_registry, artifact_id: job_artifact_1.id, success: false) create(:geo_job_artifact_registry, artifact_id: job_artifact_1.id, success: false)
create(:geo_job_artifact_registry, artifact_id: job_artifact_2.id) create(:geo_job_artifact_registry, artifact_id: job_artifact_2.id)
create(:geo_job_artifact_registry, artifact_id: job_artifact_3.id) create(:geo_job_artifact_registry, artifact_id: job_artifact_3.id)
end
it 'counts job artifacts that have been synced' do
expect(subject.count_synced).to eq 2 expect(subject.count_synced).to eq 2
end end
it 'ignores remote job artifacts' do it 'ignores remote job artifacts' do
create(:geo_job_artifact_registry, artifact_id: job_artifact_remote_1.id) job_artifact_2.update_column(:file_store, ObjectStorage::Store::REMOTE)
create(:geo_job_artifact_registry, artifact_id: job_artifact_2.id)
create(:geo_job_artifact_registry, artifact_id: job_artifact_3.id)
expect(subject.count_synced).to eq 2 expect(subject.count_synced).to eq 1
end end
it 'ignores expired job artifacts' do it 'ignores expired job artifacts' do
job_artifact_1.update_column(:expire_at, Date.yesterday) job_artifact_2.update_column(:expire_at, Date.yesterday)
create(:geo_job_artifact_registry, artifact_id: job_artifact_1.id)
create(:geo_job_artifact_registry, artifact_id: job_artifact_2.id)
create(:geo_job_artifact_registry, artifact_id: job_artifact_3.id)
expect(subject.count_synced).to eq 2 expect(subject.count_synced).to eq 1
end
end end
context 'with selective sync' do context 'with selective sync by namespace' do
before do before do
secondary.update!(selective_sync_type: 'namespaces', namespaces: [synced_group]) secondary.update!(selective_sync_type: 'namespaces', namespaces: [synced_group])
end
it 'delegates to #legacy_find_synced' do
expect(subject).to receive(:legacy_find_synced).and_call_original
subject.count_synced
end
it 'counts job artifacts that has been synced' do create(:geo_job_artifact_registry, artifact_id: job_artifact_1.id)
create(:geo_job_artifact_registry, artifact_id: job_artifact_1.id, success: false)
create(:geo_job_artifact_registry, artifact_id: job_artifact_2.id) create(:geo_job_artifact_registry, artifact_id: job_artifact_2.id)
create(:geo_job_artifact_registry, artifact_id: job_artifact_3.id) create(:geo_job_artifact_registry, artifact_id: job_artifact_3.id)
end
expect(subject.count_synced).to eq 1 it 'counts job artifacts that has been synced' do
expect(subject.count_synced).to eq 2
end end
it 'ignores remote job artifacts' do it 'ignores remote job artifacts' do
create(:geo_job_artifact_registry, artifact_id: job_artifact_remote_1.id) job_artifact_1.update_column(:file_store, ObjectStorage::Store::REMOTE)
create(:geo_job_artifact_registry, artifact_id: job_artifact_2.id)
create(:geo_job_artifact_registry, artifact_id: job_artifact_3.id)
expect(subject.count_synced).to eq 1 expect(subject.count_synced).to eq 1
end end
it 'ignores expired job artifacts' do it 'ignores expired job artifacts' do
job_artifact_1.update_column(:expire_at, Date.yesterday) job_artifact_1.update_column(:expire_at, Date.yesterday)
create(:geo_job_artifact_registry, artifact_id: job_artifact_1.id)
create(:geo_job_artifact_registry, artifact_id: job_artifact_2.id)
create(:geo_job_artifact_registry, artifact_id: job_artifact_3.id)
expect(subject.count_synced).to eq 1 expect(subject.count_synced).to eq 1
end end
end end
end
describe '#count_failed' do context 'with selective sync by shard' do
it 'delegates to #legacy_find_failed' do before do
allow(subject).to receive(:aggregate_pushdown_supported?).and_return(false) secondary.update!(selective_sync_type: 'shards', selective_sync_shards: ['broken'])
expect(subject).to receive(:legacy_find_failed).and_call_original create(:geo_job_artifact_registry, artifact_id: job_artifact_1.id)
create(:geo_job_artifact_registry, artifact_id: job_artifact_5.id)
create(:geo_job_artifact_registry, artifact_id: job_artifact_6.id)
end
subject.count_failed it 'counts job artifacts that has been synced' do
expect(subject.count_synced).to eq 2
end end
it 'delegates to #find_failed' do it 'ignores remote job artifacts' do
allow(subject).to receive(:aggregate_pushdown_supported?).and_return(true) job_artifact_5.update_column(:file_store, ObjectStorage::Store::REMOTE)
expect(subject.count_synced).to eq 1
end
expect(subject).to receive(:find_failed).and_call_original it 'ignores expired job artifacts' do
job_artifact_5.update_column(:expire_at, Date.yesterday)
subject.count_failed expect(subject.count_synced).to eq 1
end
end
end end
it 'counts job artifacts that sync has failed' do describe '#count_failed' do
let!(:job_artifact_1) { create(:ci_job_artifact, project: synced_project) }
let!(:job_artifact_2) { create(:ci_job_artifact, project: unsynced_project) }
let!(:job_artifact_3) { create(:ci_job_artifact, project: synced_project) }
let!(:job_artifact_4) { create(:ci_job_artifact, project: unsynced_project) }
let!(:job_artifact_5) { create(:ci_job_artifact, project: project_broken_storage) }
let!(:job_artifact_6) { create(:ci_job_artifact, project: project_broken_storage) }
context 'without selective sync' do
before do
create(:geo_job_artifact_registry, artifact_id: job_artifact_1.id, success: false) create(:geo_job_artifact_registry, artifact_id: job_artifact_1.id, success: false)
create(:geo_job_artifact_registry, artifact_id: job_artifact_2.id) create(:geo_job_artifact_registry, artifact_id: job_artifact_2.id)
create(:geo_job_artifact_registry, artifact_id: job_artifact_3.id, success: false) create(:geo_job_artifact_registry, artifact_id: job_artifact_3.id, success: false)
create(:geo_job_artifact_registry, artifact_id: job_artifact_5.id, success: false)
end
expect(subject.count_failed).to eq 2 it 'counts job artifacts that sync has failed' do
expect(subject.count_failed).to eq 3
end end
it 'ignores remote job artifacts' do it 'ignores remote job artifacts' do
create(:geo_job_artifact_registry, artifact_id: job_artifact_remote_1.id, success: false) job_artifact_1.update_column(:file_store, ObjectStorage::Store::REMOTE)
create(:geo_job_artifact_registry, artifact_id: job_artifact_2.id, success: false)
create(:geo_job_artifact_registry, artifact_id: job_artifact_3.id, success: false)
expect(subject.count_failed).to eq 2 expect(subject.count_failed).to eq 2
end end
it 'ignores expired job artifacts' do it 'ignores expired job artifacts' do
job_artifact_1.update_column(:expire_at, Date.yesterday) job_artifact_1.update_column(:expire_at, Date.yesterday)
create(:geo_job_artifact_registry, artifact_id: job_artifact_1.id, success: false)
create(:geo_job_artifact_registry, artifact_id: job_artifact_2.id, success: false)
create(:geo_job_artifact_registry, artifact_id: job_artifact_3.id, success: false)
expect(subject.count_failed).to eq 2 expect(subject.count_failed).to eq 2
end end
end
context 'with selective sync' do context 'with selective sync by namespace' do
before do before do
secondary.update!(selective_sync_type: 'namespaces', namespaces: [synced_group]) secondary.update!(selective_sync_type: 'namespaces', namespaces: [synced_group])
end
it 'delegates to #legacy_find_failed' do
expect(subject).to receive(:legacy_find_failed).and_call_original
subject.count_failed create(:geo_job_artifact_registry, artifact_id: job_artifact_1.id, success: false)
create(:geo_job_artifact_registry, artifact_id: job_artifact_2.id, success: false)
create(:geo_job_artifact_registry, artifact_id: job_artifact_3.id, success: false)
create(:geo_job_artifact_registry, artifact_id: job_artifact_5.id, success: false)
end end
it 'counts job artifacts that sync has failed' do it 'counts job artifacts that sync has failed' do
create(:geo_job_artifact_registry, artifact_id: job_artifact_1.id, success: false) expect(subject.count_failed).to eq 2
create(:geo_job_artifact_registry, artifact_id: job_artifact_3.id) end
it 'ignores remote job artifacts' do
job_artifact_1.update_column(:file_store, ObjectStorage::Store::REMOTE)
expect(subject.count_failed).to eq 1 expect(subject.count_failed).to eq 1
end end
it 'does not count job artifacts of unsynced projects' do it 'ignores expired job artifacts' do
create(:geo_job_artifact_registry, artifact_id: job_artifact_2.id, success: false) job_artifact_1.update_column(:expire_at, Date.yesterday)
expect(subject.count_failed).to eq 0 expect(subject.count_failed).to eq 1
end
end end
it 'ignores remote job artifacts' do context 'with selective sync by shard' do
job_artifact_1.update_column(:file_store, ObjectStorage::Store::REMOTE) before do
secondary.update!(selective_sync_type: 'shards', selective_sync_shards: ['broken'])
create(:geo_job_artifact_registry, artifact_id: job_artifact_1.id, success: false) create(:geo_job_artifact_registry, artifact_id: job_artifact_1.id, success: false)
create(:geo_job_artifact_registry, artifact_id: job_artifact_2.id, success: false) create(:geo_job_artifact_registry, artifact_id: job_artifact_2.id, success: false)
create(:geo_job_artifact_registry, artifact_id: job_artifact_3.id, success: false) create(:geo_job_artifact_registry, artifact_id: job_artifact_3.id, success: false)
create(:geo_job_artifact_registry, artifact_id: job_artifact_5.id, success: false)
create(:geo_job_artifact_registry, artifact_id: job_artifact_6.id, success: false)
end
it 'counts job artifacts that sync has failed' do
expect(subject.count_failed).to eq 2
end
it 'ignores remote job artifacts' do
job_artifact_5.update_column(:file_store, ObjectStorage::Store::REMOTE)
expect(subject.count_failed).to eq 1 expect(subject.count_failed).to eq 1
end end
it 'ignores expired job artifacts' do it 'ignores expired job artifacts' do
job_artifact_1.update_column(:expire_at, Date.yesterday) job_artifact_5.update_column(:expire_at, Date.yesterday)
create(:geo_job_artifact_registry, artifact_id: job_artifact_1.id, success: false)
create(:geo_job_artifact_registry, artifact_id: job_artifact_2.id, success: false)
create(:geo_job_artifact_registry, artifact_id: job_artifact_3.id, success: false)
expect(subject.count_failed).to eq 1 expect(subject.count_failed).to eq 1
end end
...@@ -241,96 +291,142 @@ describe Geo::JobArtifactRegistryFinder, :geo do ...@@ -241,96 +291,142 @@ describe Geo::JobArtifactRegistryFinder, :geo do
end end
describe '#count_synced_missing_on_primary' do describe '#count_synced_missing_on_primary' do
it 'delegates to #legacy_find_synced_missing_on_primary' do let!(:job_artifact_1) { create(:ci_job_artifact, project: synced_project) }
allow(subject).to receive(:aggregate_pushdown_supported?).and_return(false) let!(:job_artifact_2) { create(:ci_job_artifact, project: unsynced_project) }
let!(:job_artifact_3) { create(:ci_job_artifact, project: synced_project) }
expect(subject).to receive(:legacy_find_synced_missing_on_primary).and_call_original let!(:job_artifact_4) { create(:ci_job_artifact, project: unsynced_project) }
let!(:job_artifact_5) { create(:ci_job_artifact, project: project_broken_storage) }
subject.count_synced_missing_on_primary let!(:job_artifact_6) { create(:ci_job_artifact, project: project_broken_storage) }
context 'without selective sync' do
before do
create(:geo_job_artifact_registry, artifact_id: job_artifact_1.id, success: false, missing_on_primary: false)
create(:geo_job_artifact_registry, artifact_id: job_artifact_2.id)
create(:geo_job_artifact_registry, artifact_id: job_artifact_3.id, missing_on_primary: true)
create(:geo_job_artifact_registry, artifact_id: job_artifact_4.id, missing_on_primary: false)
create(:geo_job_artifact_registry, artifact_id: job_artifact_5.id, missing_on_primary: true)
create(:geo_job_artifact_registry, artifact_id: job_artifact_6.id)
end end
it 'delegates to #find_synced_missing_on_primary for PostgreSQL 10' do it 'counts job artifacts that have been synced and are missing on the primary' do
allow(subject).to receive(:aggregate_pushdown_supported?).and_return(true) expect(subject.count_synced_missing_on_primary).to eq 2
end
expect(subject).to receive(:find_synced_missing_on_primary).and_call_original it 'ignores remote job artifacts' do
job_artifact_3.update_column(:file_store, ObjectStorage::Store::REMOTE)
subject.count_synced_missing_on_primary expect(subject.count_synced_missing_on_primary).to eq 1
end end
it 'counts job artifacts that have been synced and are missing on the primary' do it 'ignores expired job artifacts' do
create(:geo_job_artifact_registry, artifact_id: job_artifact_1.id, missing_on_primary: true) job_artifact_3.update_column(:expire_at, Date.yesterday)
expect(subject.count_synced_missing_on_primary).to eq 1 expect(subject.count_synced_missing_on_primary).to eq 1
end end
end
it 'excludes job artifacts that are not missing on the primary' do context 'with selective sync by namespace' do
create(:geo_job_artifact_registry, artifact_id: job_artifact_1.id) before do
secondary.update!(selective_sync_type: 'namespaces', namespaces: [synced_group])
expect(subject.count_synced_missing_on_primary).to eq 0 create(:geo_job_artifact_registry, artifact_id: job_artifact_1.id, missing_on_primary: true)
create(:geo_job_artifact_registry, artifact_id: job_artifact_2.id)
create(:geo_job_artifact_registry, artifact_id: job_artifact_3.id, missing_on_primary: true)
create(:geo_job_artifact_registry, artifact_id: job_artifact_4.id, missing_on_primary: false)
create(:geo_job_artifact_registry, artifact_id: job_artifact_5.id, missing_on_primary: true)
create(:geo_job_artifact_registry, artifact_id: job_artifact_6.id)
end end
it 'excludes job artifacts that are not synced' do it 'counts job artifacts that have been synced and are missing on the primary' do
create(:geo_job_artifact_registry, artifact_id: job_artifact_1.id, success: false, missing_on_primary: true) expect(subject.count_synced_missing_on_primary).to eq 2
expect(subject.count_synced_missing_on_primary).to eq 0
end end
it 'ignores remote job artifacts' do it 'ignores remote job artifacts' do
create(:geo_job_artifact_registry, artifact_id: job_artifact_remote_1.id, missing_on_primary: true) job_artifact_1.update_column(:file_store, ObjectStorage::Store::REMOTE)
expect(subject.count_synced_missing_on_primary).to eq 0 expect(subject.count_synced_missing_on_primary).to eq 1
end end
it 'ignores expired job artifacts' do it 'ignores expired job artifacts' do
job_artifact_1.update_column(:expire_at, Date.yesterday) job_artifact_1.update_column(:expire_at, Date.yesterday)
create(:geo_job_artifact_registry, artifact_id: job_artifact_1.id, missing_on_primary: true)
expect(subject.count_synced_missing_on_primary).to eq 0 expect(subject.count_synced_missing_on_primary).to eq 1
end end
context 'with selective sync' do
before do
secondary.update!(selective_sync_type: 'namespaces', namespaces: [synced_group])
end end
it 'delegates to #legacy_find_synced_missing_on_primary' do context 'with selective sync by shard' do
expect(subject).to receive(:legacy_find_synced_missing_on_primary).and_call_original before do
secondary.update!(selective_sync_type: 'shards', selective_sync_shards: ['broken'])
subject.count_synced_missing_on_primary
end
it 'counts job artifacts that has been synced' do
create(:geo_job_artifact_registry, artifact_id: job_artifact_1.id, missing_on_primary: true) create(:geo_job_artifact_registry, artifact_id: job_artifact_1.id, missing_on_primary: true)
create(:geo_job_artifact_registry, artifact_id: job_artifact_2.id, missing_on_primary: true) create(:geo_job_artifact_registry, artifact_id: job_artifact_2.id)
create(:geo_job_artifact_registry, artifact_id: job_artifact_3.id, missing_on_primary: true) create(:geo_job_artifact_registry, artifact_id: job_artifact_3.id, missing_on_primary: true)
create(:geo_job_artifact_registry, artifact_id: job_artifact_4.id, missing_on_primary: false)
create(:geo_job_artifact_registry, artifact_id: job_artifact_5.id, missing_on_primary: true)
create(:geo_job_artifact_registry, artifact_id: job_artifact_6.id, missing_on_primary: true)
end
it 'counts job artifacts that have been synced and are missing on the primary' do
expect(subject.count_synced_missing_on_primary).to eq 2 expect(subject.count_synced_missing_on_primary).to eq 2
end end
it 'ignores remote job artifacts' do it 'ignores remote job artifacts' do
create(:geo_job_artifact_registry, artifact_id: job_artifact_remote_1.id, missing_on_primary: true) job_artifact_5.update_column(:file_store, ObjectStorage::Store::REMOTE)
expect(subject.count_synced_missing_on_primary).to eq 0 expect(subject.count_synced_missing_on_primary).to eq 1
end end
it 'ignores expired job artifacts' do it 'ignores expired job artifacts' do
job_artifact_1.update_column(:expire_at, Date.yesterday) job_artifact_5.update_column(:expire_at, Date.yesterday)
create(:geo_job_artifact_registry, artifact_id: job_artifact_1.id, missing_on_primary: true)
expect(subject.count_synced_missing_on_primary).to eq 0 expect(subject.count_synced_missing_on_primary).to eq 1
end end
end end
end end
describe '#count_registry' do
let!(:job_artifact_1) { create(:ci_job_artifact, project: synced_project) }
let!(:job_artifact_2) { create(:ci_job_artifact, project: unsynced_project) }
let!(:job_artifact_3) { create(:ci_job_artifact, project: synced_project) }
let!(:job_artifact_4) { create(:ci_job_artifact, project: unsynced_project) }
let!(:job_artifact_5) { create(:ci_job_artifact, project: project_broken_storage) }
let!(:job_artifact_6) { create(:ci_job_artifact, project: project_broken_storage) }
before do
create(:geo_job_artifact_registry, artifact_id: job_artifact_1.id, success: false)
create(:geo_job_artifact_registry, artifact_id: job_artifact_3.id, missing_on_primary: true)
create(:geo_job_artifact_registry, artifact_id: job_artifact_4.id)
create(:geo_job_artifact_registry, artifact_id: job_artifact_6.id)
end end
shared_examples 'finds all the things' do it 'counts file registries for job artifacts' do
describe '#find_unsynced' do expect(subject.count_registry).to eq 4
it 'delegates to the correct method' do end
expect(subject).to receive("#{method_prefix}_find_unsynced".to_sym).and_call_original
context 'with selective sync by namespace' do
before do
secondary.update!(selective_sync_type: 'namespaces', namespaces: [synced_group])
end
it 'counts file registries for job artifacts' do
expect(subject.count_registry).to eq 2
end
end
subject.find_unsynced(batch_size: 10) context 'with selective sync by shard' do
before do
secondary.update!(selective_sync_type: 'shards', selective_sync_shards: ['broken'])
end end
it 'counts file registries for job artifacts' do
expect(subject.count_registry).to eq 1
end
end
end
end
shared_examples 'finds all the things' do
describe '#find_unsynced' do
it 'returns job artifacts without an entry on the tracking database' do it 'returns job artifacts without an entry on the tracking database' do
create(:geo_job_artifact_registry, artifact_id: job_artifact_1.id, success: true) create(:geo_job_artifact_registry, artifact_id: job_artifact_1.id, success: true)
create(:geo_job_artifact_registry, artifact_id: job_artifact_3.id, success: false) create(:geo_job_artifact_registry, artifact_id: job_artifact_3.id, success: false)
...@@ -367,12 +463,6 @@ describe Geo::JobArtifactRegistryFinder, :geo do ...@@ -367,12 +463,6 @@ describe Geo::JobArtifactRegistryFinder, :geo do
end end
describe '#find_migrated_local' do describe '#find_migrated_local' do
it 'delegates to the correct method' do
expect(subject).to receive("#{method_prefix}_find_migrated_local".to_sym).and_call_original
subject.find_migrated_local(batch_size: 10)
end
it 'returns job artifacts remotely and successfully synced locally' do it 'returns job artifacts remotely and successfully synced locally' do
job_artifact = create(:ci_job_artifact, :remote_store, project: synced_project) job_artifact = create(:ci_job_artifact, :remote_store, project: synced_project)
create(:geo_job_artifact_registry, artifact_id: job_artifact.id) create(:geo_job_artifact_registry, artifact_id: job_artifact.id)
...@@ -418,5 +508,34 @@ describe Geo::JobArtifactRegistryFinder, :geo do ...@@ -418,5 +508,34 @@ describe Geo::JobArtifactRegistryFinder, :geo do
end end
end end
it_behaves_like 'a file registry finder' # Disable transactions via :delete method because a foreign table
# can't see changes inside a transaction of a different connection.
context 'FDW', :geo_fdw, :delete do
context 'with use_fdw_queries_for_selective_sync disabled' do
before do
stub_feature_flags(use_fdw_queries_for_selective_sync: false)
end
include_examples 'counts all the things'
include_examples 'finds all the things'
end
context 'with use_fdw_queries_for_selective_sync enabled' do
before do
stub_feature_flags(use_fdw_queries_for_selective_sync: true)
end
include_examples 'counts all the things'
include_examples 'finds all the things'
end
end
context 'Legacy' do
before do
stub_fdw_disabled
end
include_examples 'counts all the things'
include_examples 'finds all the things'
end
end end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment