Commit 461adca6 authored by Douglas Barbosa Alexandre's avatar Douglas Barbosa Alexandre Committed by Nick Thomas

Resolve "Fix difference in FDW / non-FDW queries for Geo::FileRegistry queries"

parent 834ae7bc
...@@ -2,6 +2,8 @@ module Geo ...@@ -2,6 +2,8 @@ module Geo
module Fdw module Fdw
class LfsObject < ::Geo::BaseFdw class LfsObject < ::Geo::BaseFdw
self.table_name = Gitlab::Geo.fdw_table('lfs_objects') self.table_name = Gitlab::Geo.fdw_table('lfs_objects')
scope :with_files_stored_locally, ->() { where(file_store: [nil, LfsObjectUploader::LOCAL_STORE]) }
end end
end end
end end
...@@ -12,8 +12,16 @@ module Geo ...@@ -12,8 +12,16 @@ module Geo
{ id: object_db_id, type: object_type, job_id: job_id } if job_id { id: object_db_id, type: object_type, job_id: job_id } if job_id
end end
def finder def attachments_finder
@finder ||= FileRegistryFinder.new(current_node: current_node) @attachments_finder ||= AttachmentRegistryFinder.new(current_node: current_node)
end
def file_registry_finder
@file_registry_finder ||= FileRegistryFinder.new(current_node: current_node)
end
def lfs_objects_finder
@lfs_objects_finder ||= LfsObjectRegistryFinder.new(current_node: current_node)
end end
# Pools for new resources to be transferred # Pools for new resources to be transferred
...@@ -26,19 +34,36 @@ module Geo ...@@ -26,19 +34,36 @@ module Geo
if remaining_capacity.zero? if remaining_capacity.zero?
resources resources
else else
resources + finder.find_failed_objects(batch_size: remaining_capacity) resources + find_failed_upload_object_ids(batch_size: remaining_capacity)
end end
end end
def find_unsynced_objects(batch_size:) def find_unsynced_objects(batch_size:)
lfs_object_ids = finder.find_nonreplicated_lfs_objects(batch_size: batch_size, except_registry_ids: scheduled_file_ids(:lfs)) lfs_object_ids = find_unsynced_lfs_objects_ids(batch_size: batch_size)
upload_objects_ids = finder.find_nonreplicated_uploads(batch_size: batch_size, except_registry_ids: scheduled_file_ids(Geo::FileService::DEFAULT_OBJECT_TYPES)) attachment_ids = find_unsynced_attachments_ids(batch_size: batch_size)
interleave(lfs_object_ids, attachment_ids)
end
def find_unsynced_lfs_objects_ids(batch_size:)
lfs_objects_finder.find_unsynced_lfs_objects(batch_size: batch_size, except_registry_ids: scheduled_file_ids(:lfs))
.pluck(:id)
.map { |id| [id, :lfs] }
end
def find_unsynced_attachments_ids(batch_size:)
attachments_finder.find_unsynced_attachments(batch_size: batch_size, except_registry_ids: scheduled_file_ids(Geo::FileService::DEFAULT_OBJECT_TYPES))
.pluck(:id, :uploader)
.map { |id, uploader| [id, uploader.sub(/Uploader\z/, '').underscore] }
end
interleave(lfs_object_ids, upload_objects_ids) def find_failed_upload_object_ids(batch_size:)
file_registry_finder.find_failed_file_registries(batch_size: batch_size)
.pluck(:file_id, :file_type)
end end
def scheduled_file_ids(file_types) def scheduled_file_ids(file_types)
file_types = Array(file_types) unless file_types.is_a? Array file_types = Array(file_types)
scheduled_jobs.select { |data| file_types.include?(data[:type]) }.map { |data| data[:id] } scheduled_jobs.select { |data| file_types.include?(data[:type]) }.map { |data| data[:id] }
end end
......
---
title: Geo - Fix difference in FDW / non-FDW queries for Geo::FileRegistry queries
merge_request: 3714
author:
type: fixed
module Geo module Geo
class AttachmentRegistryFinder < RegistryFinder class AttachmentRegistryFinder < FileRegistryFinder
def attachments
if selective_sync?
Upload.where(group_uploads.or(project_uploads).or(other_uploads))
else
Upload.all
end
end
def count_attachments def count_attachments
uploads.count attachments.count
end end
def count_synced_attachments def count_synced_attachments
...@@ -34,12 +42,25 @@ module Geo ...@@ -34,12 +42,25 @@ module Geo
relation relation
end end
def uploads # Find limited amount of non replicated attachments.
if selective_sync? #
Upload.where(group_uploads.or(project_uploads).or(other_uploads)) # You can pass a list with `except_registry_ids:` so you can exclude items you
# already scheduled but haven't finished and persisted to the database yet
#
# TODO: Alternative here is to use some sort of window function with a cursor instead
# of simply limiting the query and passing a list of items we don't want
#
# @param [Integer] batch_size used to limit the results returned
# @param [Array<Integer>] except_registry_ids ids that will be ignored from the query
def find_unsynced_attachments(batch_size:, except_registry_ids: [])
relation =
if use_legacy_queries?
legacy_find_unsynced_attachments(except_registry_ids: except_registry_ids)
else else
Upload.all fdw_find_unsynced_attachments(except_registry_ids: except_registry_ids)
end end
relation.limit(batch_size)
end end
private private
...@@ -85,29 +106,45 @@ module Geo ...@@ -85,29 +106,45 @@ module Geo
.merge(Geo::FileRegistry.attachments) .merge(Geo::FileRegistry.attachments)
end end
def fdw_find_unsynced_attachments(except_registry_ids:)
fdw_table = Geo::Fdw::Upload.table_name
upload_types = Geo::FileService::DEFAULT_OBJECT_TYPES.map { |val| "'#{val}'" }.join(',')
Geo::Fdw::Upload.joins("LEFT OUTER JOIN file_registry
ON file_registry.file_id = #{fdw_table}.id
AND file_registry.file_type IN (#{upload_types})")
.where(file_registry: { id: nil })
.where.not(id: except_registry_ids)
end
# #
# Legacy accessors (non FDW) # Legacy accessors (non FDW)
# #
def legacy_find_synced_attachments def legacy_find_synced_attachments
legacy_find_attachments(Geo::FileRegistry.attachments.synced.pluck(:file_id)) legacy_inner_join_registry_ids(
attachments,
Geo::FileRegistry.attachments.synced.pluck(:file_id),
Upload
)
end end
def legacy_find_failed_attachments def legacy_find_failed_attachments
legacy_find_attachments(Geo::FileRegistry.attachments.failed.pluck(:file_id)) legacy_inner_join_registry_ids(
attachments,
Geo::FileRegistry.attachments.failed.pluck(:file_id),
Upload
)
end end
def legacy_find_attachments(registry_file_ids) def legacy_find_unsynced_attachments(except_registry_ids:)
return Upload.none if registry_file_ids.empty? registry_ids = legacy_pluck_registry_ids(file_types: Geo::FileService::DEFAULT_OBJECT_TYPES, except_registry_ids: except_registry_ids)
joined_relation = uploads.joins(<<~SQL)
INNER JOIN
(VALUES #{registry_file_ids.map { |id| "(#{id})" }.join(',')})
file_registry(file_id)
ON #{Upload.table_name}.id = file_registry.file_id
SQL
joined_relation legacy_left_outer_join_registry_ids(
attachments,
registry_ids,
Upload
)
end end
end end
end end
module Geo module Geo
class FileRegistryFinder < RegistryFinder class FileRegistryFinder < RegistryFinder
def find_failed_objects(batch_size:) def find_failed_file_registries(batch_size:)
Geo::FileRegistry Geo::FileRegistry.failed.retry_due.limit(batch_size)
.failed
.retry_due
.limit(batch_size)
.pluck(:file_id, :file_type)
end
# Find limited amount of non replicated lfs objects.
#
# You can pass a list with `except_registry_ids:` so you can exclude items you
# already scheduled but haven't finished and persisted to the database yet
#
# TODO: Alternative here is to use some sort of window function with a cursor instead
# of simply limiting the query and passing a list of items we don't want
#
# @param [Integer] batch_size used to limit the results returned
# @param [Array<Integer>] except_registry_ids ids that will be ignored from the query
def find_nonreplicated_lfs_objects(batch_size:, except_registry_ids:)
# Selective project replication adds a wrinkle to FDW queries, so
# we fallback to the legacy version for now.
relation =
if use_legacy_queries?
legacy_find_nonreplicated_lfs_objects(except_registry_ids: except_registry_ids)
else
fdw_find_nonreplicated_lfs_objects
end
relation
.limit(batch_size)
.pluck(:id)
.map { |id| [id, :lfs] }
end
# Find limited amount of non replicated uploads.
#
# You can pass a list with `except_registry_ids:` so you can exclude items you
# already scheduled but haven't finished and persisted to the database yet
#
# TODO: Alternative here is to use some sort of window function with a cursor instead
# of simply limiting the query and passing a list of items we don't want
#
# @param [Integer] batch_size used to limit the results returned
# @param [Array<Integer>] except_registry_ids ids that will be ignored from the query
def find_nonreplicated_uploads(batch_size:, except_registry_ids:)
# Selective project replication adds a wrinkle to FDW queries, so
# we fallback to the legacy version for now.
relation =
if use_legacy_queries?
legacy_find_nonreplicated_uploads(except_registry_ids: except_registry_ids)
else
fdw_find_nonreplicated_uploads
end
relation
.limit(batch_size)
.pluck(:id, :uploader)
.map { |id, uploader| [id, uploader.sub(/Uploader\z/, '').underscore] }
end end
protected protected
#
# FDW accessors
#
def fdw_find_nonreplicated_lfs_objects
fdw_table = Geo::Fdw::LfsObject.table_name
# Filter out objects in object storage (this is done in GeoNode#lfs_objects)
Geo::Fdw::LfsObject.joins("LEFT OUTER JOIN file_registry
ON file_registry.file_id = #{fdw_table}.id
AND file_registry.file_type = 'lfs'")
.where("#{fdw_table}.file_store IS NULL OR #{fdw_table}.file_store = #{LfsObjectUploader::LOCAL_STORE}")
.where('file_registry.file_id IS NULL')
end
def fdw_find_nonreplicated_uploads
fdw_table = Geo::Fdw::Upload.table_name
upload_types = Geo::FileService::DEFAULT_OBJECT_TYPES.map { |val| "'#{val}'" }.join(',')
Geo::Fdw::Upload.joins("LEFT OUTER JOIN file_registry
ON file_registry.file_id = #{fdw_table}.id
AND file_registry.file_type IN (#{upload_types})")
.where('file_registry.file_id IS NULL')
end
#
# Legacy accessors (non FDW)
#
def legacy_find_nonreplicated_lfs_objects(except_registry_ids:)
registry_ids = legacy_pluck_registry_ids(file_types: :lfs, except_registry_ids: except_registry_ids)
legacy_filter_registry_ids(
lfs_objects_finder.lfs_objects,
registry_ids,
LfsObject.table_name
)
end
def legacy_find_nonreplicated_uploads(except_registry_ids:)
registry_ids = legacy_pluck_registry_ids(file_types: Geo::FileService::DEFAULT_OBJECT_TYPES, except_registry_ids: except_registry_ids)
legacy_filter_registry_ids(
attachments_finder.uploads,
registry_ids,
Upload.table_name
)
end
# This query requires data from two different databases, and unavoidably
# plucks a list of file IDs from one into the other. This will not scale
# well with the number of synchronized files--the query will increase
# linearly in size--so this should be replaced with postgres_fdw ASAP.
def legacy_filter_registry_ids(objects, registry_ids, table_name)
return objects if registry_ids.empty?
joined_relation = objects.joins(<<~SQL)
LEFT OUTER JOIN
(VALUES #{registry_ids.map { |id| "(#{id}, 't')" }.join(',')})
file_registry(file_id, registry_present)
ON #{table_name}.id = file_registry.file_id
SQL
joined_relation.where(file_registry: { registry_present: [nil, false] })
end
def legacy_pluck_registry_ids(file_types:, except_registry_ids:) def legacy_pluck_registry_ids(file_types:, except_registry_ids:)
ids = Geo::FileRegistry.where(file_type: file_types).pluck(:file_id) ids = Geo::FileRegistry.where(file_type: file_types).pluck(:file_id)
(ids + except_registry_ids).uniq (ids + except_registry_ids).uniq
end end
def attachments_finder
@attachments_finder ||= AttachmentRegistryFinder.new(current_node: current_node)
end
def lfs_objects_finder
@lfs_objects_finder ||= LfsObjectRegistryFinder.new(current_node: current_node)
end
end end
end end
module Geo module Geo
class LfsObjectRegistryFinder < RegistryFinder class LfsObjectRegistryFinder < FileRegistryFinder
def count_lfs_objects def count_lfs_objects
lfs_objects.count lfs_objects.count
end end
...@@ -26,6 +26,27 @@ module Geo ...@@ -26,6 +26,27 @@ module Geo
relation.count relation.count
end end
# Find limited amount of non replicated lfs objects.
#
# You can pass a list with `except_registry_ids:` so you can exclude items you
# already scheduled but haven't finished and persisted to the database yet
#
# TODO: Alternative here is to use some sort of window function with a cursor instead
# of simply limiting the query and passing a list of items we don't want
#
# @param [Integer] batch_size used to limit the results returned
# @param [Array<Integer>] except_registry_ids ids that will be ignored from the query
def find_unsynced_lfs_objects(batch_size:, except_registry_ids: [])
relation =
if use_legacy_queries?
legacy_find_unsynced_lfs_objects(except_registry_ids: except_registry_ids)
else
fdw_find_unsynced_lfs_objects(except_registry_ids: except_registry_ids)
end
relation.limit(batch_size)
end
def lfs_objects def lfs_objects
relation = relation =
if selective_sync? if selective_sync?
...@@ -47,29 +68,50 @@ module Geo ...@@ -47,29 +68,50 @@ module Geo
Geo::FileRegistry.lfs_objects.failed Geo::FileRegistry.lfs_objects.failed
end end
#
# FDW accessors
#
def fdw_find_unsynced_lfs_objects(except_registry_ids:)
fdw_table = Geo::Fdw::LfsObject.table_name
# Filter out objects in object storage (this is done in GeoNode#lfs_objects)
Geo::Fdw::LfsObject.joins("LEFT OUTER JOIN file_registry
ON file_registry.file_id = #{fdw_table}.id
AND file_registry.file_type = 'lfs'")
.merge(Geo::Fdw::LfsObject.with_files_stored_locally)
.where(file_registry: { id: nil })
.where.not(id: except_registry_ids)
end
#
# Legacy accessors (non FDW)
#
def legacy_find_synced_lfs_objects def legacy_find_synced_lfs_objects
legacy_find_lfs_objects(find_synced_lfs_objects_registries.pluck(:file_id)) legacy_inner_join_registry_ids(
lfs_objects,
find_synced_lfs_objects_registries.pluck(:file_id),
LfsObject
)
end end
def legacy_find_failed_lfs_objects def legacy_find_failed_lfs_objects
legacy_find_lfs_objects(find_failed_lfs_objects_registries.pluck(:file_id)) legacy_inner_join_registry_ids(
lfs_objects,
find_failed_lfs_objects_registries.pluck(:file_id),
LfsObject
)
end end
def legacy_find_lfs_objects(registry_file_ids) def legacy_find_unsynced_lfs_objects(except_registry_ids:)
return LfsObject.none if registry_file_ids.empty? registry_ids = legacy_pluck_registry_ids(file_types: :lfs, except_registry_ids: except_registry_ids)
lfs_objects = LfsObject.joins(:projects)
.where(projects: { id: current_node.projects })
.with_files_stored_locally
joined_relation = lfs_objects.joins(<<~SQL)
INNER JOIN
(VALUES #{registry_file_ids.map { |id| "(#{id})" }.join(',')})
file_registry(file_id)
ON #{LfsObject.table_name}.id = file_registry.file_id
SQL
joined_relation legacy_left_outer_join_registry_ids(
lfs_objects,
registry_ids,
LfsObject
)
end end
end end
end end
...@@ -7,7 +7,7 @@ module Geo ...@@ -7,7 +7,7 @@ module Geo
def count_synced_project_registries def count_synced_project_registries
relation = relation =
if selective_sync? if selective_sync?
legacy_find_synced_project_registries legacy_find_synced_projects
else else
find_synced_project_registries find_synced_project_registries
end end
...@@ -22,7 +22,7 @@ module Geo ...@@ -22,7 +22,7 @@ module Geo
def find_failed_project_registries(type = nil) def find_failed_project_registries(type = nil)
relation = relation =
if selective_sync? if selective_sync?
legacy_find_filtered_failed_project_registries(type) legacy_find_filtered_failed_projects(type)
else else
find_filtered_failed_project_registries(type) find_filtered_failed_project_registries(type)
end end
...@@ -80,7 +80,7 @@ module Geo ...@@ -80,7 +80,7 @@ module Geo
# @return [ActiveRecord::Relation<Geo::Fdw::Project>] # @return [ActiveRecord::Relation<Geo::Fdw::Project>]
def fdw_find_unsynced_projects def fdw_find_unsynced_projects
Geo::Fdw::Project.joins("LEFT OUTER JOIN project_registry ON project_registry.project_id = #{fdw_table}.id") Geo::Fdw::Project.joins("LEFT OUTER JOIN project_registry ON project_registry.project_id = #{fdw_table}.id")
.where('project_registry.project_id IS NULL') .where(project_registry: { project_id: nil })
end end
# @return [ActiveRecord::Relation<Geo::Fdw::Project>] # @return [ActiveRecord::Relation<Geo::Fdw::Project>]
...@@ -96,61 +96,39 @@ module Geo ...@@ -96,61 +96,39 @@ module Geo
# @return [ActiveRecord::Relation<Project>] list of unsynced projects # @return [ActiveRecord::Relation<Project>] list of unsynced projects
def legacy_find_unsynced_projects def legacy_find_unsynced_projects
registry_project_ids = Geo::ProjectRegistry.pluck(:project_id) legacy_left_outer_join_registry_ids(
return current_node.projects if registry_project_ids.empty? current_node.projects,
Geo::ProjectRegistry.pluck(:project_id),
joined_relation = current_node.projects.joins(<<~SQL) Project
LEFT OUTER JOIN )
(VALUES #{registry_project_ids.map { |id| "(#{id}, 't')" }.join(',')})
project_registry(project_id, registry_present)
ON projects.id = project_registry.project_id
SQL
joined_relation.where(project_registry: { registry_present: [nil, false] })
end end
# @return [ActiveRecord::Relation<Project>] list of projects updated recently # @return [ActiveRecord::Relation<Project>] list of projects updated recently
def legacy_find_projects_updated_recently def legacy_find_projects_updated_recently
legacy_find_projects(Geo::ProjectRegistry.dirty.retry_due.pluck(:project_id)) legacy_inner_join_registry_ids(
end current_node.projects,
Geo::ProjectRegistry.dirty.retry_due.pluck(:project_id),
# @return [ActiveRecord::Relation<Geo::ProjectRegistry>] list of synced projects Project
def legacy_find_synced_project_registries )
legacy_find_project_registries(Geo::ProjectRegistry.synced) end
end
# @return [ActiveRecord::Relation<Project>] list of synced projects
# @return [ActiveRecord::Relation<Geo::ProjectRegistry>] list of projects that sync has failed def legacy_find_synced_projects
def legacy_find_filtered_failed_project_registries(type = nil) legacy_inner_join_registry_ids(
project_registries = find_filtered_failed_project_registries(type) current_node.projects,
legacy_find_project_registries(project_registries) Geo::ProjectRegistry.synced.pluck(:project_id),
end Project
)
# @return [ActiveRecord::Relation<Project>] end
def legacy_find_projects(registry_project_ids)
return Project.none if registry_project_ids.empty? # @return [ActiveRecord::Relation<Project>] list of projects that sync has failed
def legacy_find_filtered_failed_projects(type = nil)
joined_relation = current_node.projects.joins(<<~SQL) legacy_inner_join_registry_ids(
INNER JOIN find_filtered_failed_project_registries(type),
(VALUES #{registry_project_ids.map { |id| "(#{id})" }.join(',')}) current_node.projects.pluck(:id),
project_registry(project_id) Geo::ProjectRegistry,
ON #{Project.table_name}.id = project_registry.project_id foreign_key: :project_id
SQL )
joined_relation
end
# @return [ActiveRecord::Relation<Geo::ProjectRegistry>]
def legacy_find_project_registries(project_registries)
return Geo::ProjectRegistry.none if project_registries.empty?
joined_relation = project_registries.joins(<<~SQL)
INNER JOIN
(VALUES #{current_node.projects.pluck(:id).map { |id| "(#{id})" }.join(',')})
projects(id)
ON #{Geo::ProjectRegistry.table_name}.project_id = projects.id
SQL
joined_relation
end end
end end
end end
...@@ -15,5 +15,31 @@ module Geo ...@@ -15,5 +15,31 @@ module Geo
# queries, so we fallback to the legacy version for now. # queries, so we fallback to the legacy version for now.
!Gitlab::Geo.fdw? || selective_sync? !Gitlab::Geo.fdw? || selective_sync?
end end
def legacy_inner_join_registry_ids(objects, registry_ids, klass, foreign_key: :id)
return klass.none if registry_ids.empty?
joined_relation = objects.joins(<<~SQL)
INNER JOIN
(VALUES #{registry_ids.map { |id| "(#{id})" }.join(',')})
registry(id)
ON #{klass.table_name}.#{foreign_key} = registry.id
SQL
joined_relation
end
def legacy_left_outer_join_registry_ids(objects, registry_ids, klass)
return objects if registry_ids.empty?
joined_relation = objects.joins(<<~SQL)
LEFT OUTER JOIN
(VALUES #{registry_ids.map { |id| "(#{id}, 't')" }.join(',')})
registry(id, registry_present)
ON #{klass.table_name}.id = registry.id
SQL
joined_relation.where(registry: { registry_present: [nil, false] })
end
end end
end end
...@@ -11,10 +11,10 @@ describe Geo::AttachmentRegistryFinder, :geo do ...@@ -11,10 +11,10 @@ describe Geo::AttachmentRegistryFinder, :geo do
let(:synced_project) { create(:project, group: synced_group) } let(:synced_project) { create(:project, group: synced_group) }
let(:unsynced_project) { create(:project, group: unsynced_group) } let(:unsynced_project) { create(:project, group: unsynced_group) }
let(:upload_1) { create(:upload, model: synced_group) } let!(:upload_1) { create(:upload, model: synced_group) }
let(:upload_2) { create(:upload, model: unsynced_group) } let!(:upload_2) { create(:upload, model: unsynced_group) }
let(:upload_3) { create(:upload, :issuable_upload, model: synced_project) } let!(:upload_3) { create(:upload, :issuable_upload, model: synced_project) }
let(:upload_4) { create(:upload, model: unsynced_project) } let!(:upload_4) { create(:upload, model: unsynced_project) }
let(:upload_5) { create(:upload, model: synced_project) } let(:upload_5) { create(:upload, model: synced_project) }
let(:upload_6) { create(:upload, :personal_snippet) } let(:upload_6) { create(:upload, :personal_snippet) }
let(:upload_7) { create(:upload, model: synced_subgroup) } let(:upload_7) { create(:upload, model: synced_subgroup) }
...@@ -112,6 +112,30 @@ describe Geo::AttachmentRegistryFinder, :geo do ...@@ -112,6 +112,30 @@ describe Geo::AttachmentRegistryFinder, :geo do
end end
end end
end end
describe '#find_unsynced_attachments' do
it 'delegates to #fdw_find_unsynced_attachments' do
expect(subject).to receive(:fdw_find_unsynced_attachments).and_call_original
subject.find_unsynced_attachments(batch_size: 10)
end
it 'returns uploads without an entry on the tracking database' do
create(:geo_file_registry, :avatar, file_id: upload_1.id, success: true)
uploads = subject.find_unsynced_attachments(batch_size: 10)
expect(uploads.map(&:id)).to match_array([upload_2.id, upload_3.id, upload_4.id])
end
it 'excludes uploads without an entry on the tracking database' do
create(:geo_file_registry, :avatar, file_id: upload_1.id, success: true)
uploads = subject.find_unsynced_attachments(batch_size: 10, except_registry_ids: [upload_2.id])
expect(uploads.map(&:id)).to match_array([upload_3.id, upload_4.id])
end
end
end end
context 'Legacy' do context 'Legacy' do
...@@ -198,5 +222,29 @@ describe Geo::AttachmentRegistryFinder, :geo do ...@@ -198,5 +222,29 @@ describe Geo::AttachmentRegistryFinder, :geo do
end end
end end
end end
describe '#find_unsynced_attachments' do
it 'delegates to #legacy_find_unsynced_attachments' do
expect(subject).to receive(:legacy_find_unsynced_attachments).and_call_original
subject.find_unsynced_attachments(batch_size: 10)
end
it 'returns LFS objects without an entry on the tracking database' do
create(:geo_file_registry, :avatar, file_id: upload_1.id, success: true)
uploads = subject.find_unsynced_attachments(batch_size: 10)
expect(uploads).to match_array([upload_2, upload_3, upload_4])
end
it 'excludes uploads without an entry on the tracking database' do
create(:geo_file_registry, :avatar, file_id: upload_1.id, success: true)
uploads = subject.find_unsynced_attachments(batch_size: 10, except_registry_ids: [upload_2.id])
expect(uploads).to match_array([upload_3, upload_4])
end
end
end end
end end
require 'spec_helper'
describe Geo::FileRegistryFinder, :geo do
include ::EE::GeoHelpers
let(:secondary) { create(:geo_node) }
subject { described_class.new(current_node: secondary) }
before do
stub_current_geo_node(secondary)
end
describe '#find_failed_file_registries' do
it 'returs uploads that sync has failed' do
failed_lfs_registry = create(:geo_file_registry, :lfs, :with_file, success: false)
failed_file_upload = create(:geo_file_registry, :with_file, success: false)
failed_issuable_upload = create(:geo_file_registry, :with_file, success: false)
create(:geo_file_registry, :lfs, :with_file, success: true)
create(:geo_file_registry, :with_file, success: true)
uploads = subject.find_failed_file_registries(batch_size: 10)
expect(uploads).to match_array([failed_lfs_registry, failed_file_upload, failed_issuable_upload])
end
end
end
...@@ -7,9 +7,11 @@ describe Geo::LfsObjectRegistryFinder, :geo do ...@@ -7,9 +7,11 @@ describe Geo::LfsObjectRegistryFinder, :geo do
let(:synced_group) { create(:group) } let(:synced_group) { create(:group) }
let(:synced_project) { create(:project, group: synced_group) } let(:synced_project) { create(:project, group: synced_group) }
let(:unsynced_project) { create(:project) } let(:unsynced_project) { create(:project) }
let(:lfs_object_1) { create(:lfs_object) }
let(:lfs_object_2) { create(:lfs_object) } let!(:lfs_object_1) { create(:lfs_object) }
let(:lfs_object_3) { create(:lfs_object) } let!(:lfs_object_2) { create(:lfs_object) }
let!(:lfs_object_3) { create(:lfs_object) }
let!(:lfs_object_4) { create(:lfs_object) }
subject { described_class.new(current_node: secondary) } subject { described_class.new(current_node: secondary) }
...@@ -100,4 +102,70 @@ describe Geo::LfsObjectRegistryFinder, :geo do ...@@ -100,4 +102,70 @@ describe Geo::LfsObjectRegistryFinder, :geo do
end end
end end
end end
# Disable transactions via :delete method because a foreign table
# can't see changes inside a transaction of a different connection.
context 'FDW', :delete do
before do
skip('FDW is not configured') if Gitlab::Database.postgresql? && !Gitlab::Geo.fdw?
end
describe '#find_unsynced_lfs_objects' do
it 'delegates to #fdw_find_unsynced_lfs_objects' do
expect(subject).to receive(:fdw_find_unsynced_lfs_objects).and_call_original
subject.find_unsynced_lfs_objects(batch_size: 10)
end
it 'returns LFS objects without an entry on the tracking database' do
create(:geo_file_registry, :lfs, file_id: lfs_object_1.id, success: true)
create(:geo_file_registry, :lfs, file_id: lfs_object_3.id, success: false)
lfs_objects = subject.find_unsynced_lfs_objects(batch_size: 10)
expect(lfs_objects.map(&:id)).to match_array([lfs_object_2.id, lfs_object_4.id])
end
it 'excludes LFS objects without an entry on the tracking database' do
create(:geo_file_registry, :lfs, file_id: lfs_object_1.id, success: true)
create(:geo_file_registry, :lfs, file_id: lfs_object_3.id, success: false)
lfs_objects = subject.find_unsynced_lfs_objects(batch_size: 10, except_registry_ids: [lfs_object_2.id])
expect(lfs_objects.map(&:id)).to match_array([lfs_object_4.id])
end
end
end
context 'Legacy' do
before do
allow(Gitlab::Geo).to receive(:fdw?).and_return(false)
end
describe '#find_unsynced_lfs_objects' do
it 'delegates to #legacy_find_unsynced_lfs_objects' do
expect(subject).to receive(:legacy_find_unsynced_lfs_objects).and_call_original
subject.find_unsynced_lfs_objects(batch_size: 10)
end
it 'returns LFS objects without an entry on the tracking database' do
create(:geo_file_registry, :lfs, file_id: lfs_object_1.id, success: true)
create(:geo_file_registry, :lfs, file_id: lfs_object_3.id, success: false)
lfs_objects = subject.find_unsynced_lfs_objects(batch_size: 10)
expect(lfs_objects).to match_array([lfs_object_2, lfs_object_4])
end
it 'excludes LFS objects without an entry on the tracking database' do
create(:geo_file_registry, :lfs, file_id: lfs_object_1.id, success: true)
create(:geo_file_registry, :lfs, file_id: lfs_object_3.id, success: false)
lfs_objects = subject.find_unsynced_lfs_objects(batch_size: 10, except_registry_ids: [lfs_object_2.id])
expect(lfs_objects).to match_array([lfs_object_4])
end
end
end
end end
...@@ -37,8 +37,8 @@ describe Geo::ProjectRegistryFinder, :geo do ...@@ -37,8 +37,8 @@ describe Geo::ProjectRegistryFinder, :geo do
secondary.update_attribute(:namespaces, [synced_group]) secondary.update_attribute(:namespaces, [synced_group])
end end
it 'delegates to #legacy_find_synced_project_registries' do it 'delegates to #legacy_find_synced_projects' do
expect(subject).to receive(:legacy_find_synced_project_registries).and_call_original expect(subject).to receive(:legacy_find_synced_projects).and_call_original
subject.count_synced_project_registries subject.count_synced_project_registries
end end
...@@ -128,8 +128,8 @@ describe Geo::ProjectRegistryFinder, :geo do ...@@ -128,8 +128,8 @@ describe Geo::ProjectRegistryFinder, :geo do
secondary.update_attribute(:namespaces, [synced_group]) secondary.update_attribute(:namespaces, [synced_group])
end end
it 'delegates to #legacy_find_filtered_failed_project_registries' do it 'delegates to #legacy_find_filtered_failed_projects' do
expect(subject).to receive(:legacy_find_filtered_failed_project_registries).and_call_original expect(subject).to receive(:legacy_find_filtered_failed_projects).and_call_original
subject.find_failed_project_registries subject.find_failed_project_registries
end end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment