Add selective support to FDW to find syncable job artifacts

This changes also introduces a legacy finder that will help us
to remove those queries in the future.
parent 458bb965
......@@ -34,6 +34,16 @@ module Geo
Geo::JobArtifactRegistry.count
end
def syncable
if use_legacy_queries_for_selective_sync?
legacy_finder.syncable
elsif selective_sync?
job_artifacts.geo_syncable
else
Ci::JobArtifact.geo_syncable
end
end
# Find limited amount of non replicated job artifacts.
#
# You can pass a list with `except_artifact_ids:` so you can exclude items you
......@@ -70,10 +80,6 @@ module Geo
end
# rubocop: enable CodeReuse/ActiveRecord
def syncable
all.geo_syncable
end
# rubocop: disable CodeReuse/ActiveRecord
def find_retryable_failed_registries(batch_size:, except_artifact_ids: [])
find_failed_registries
......@@ -94,15 +100,23 @@ module Geo
private
# rubocop: disable CodeReuse/ActiveRecord
def all
# rubocop:disable CodeReuse/Finder
def legacy_finder
@legacy_finder ||= Geo::LegacyJobArtifactRegistryFinder.new(current_node: current_node)
end
# rubocop:enable CodeReuse/Finder
def fdw_geo_node
@fdw_geo_node ||= Geo::Fdw::GeoNode.find(current_node.id)
end
def job_artifacts
if selective_sync?
Ci::JobArtifact.joins(:project).where(projects: { id: current_node.projects })
Geo::Fdw::Ci::JobArtifact.project_id_in(fdw_geo_node.projects)
else
Ci::JobArtifact.all
Geo::Fdw::Ci::JobArtifact.all
end
end
# rubocop: enable CodeReuse/ActiveRecord
def find_synced
if use_legacy_queries?
......@@ -146,14 +160,14 @@ module Geo
# rubocop: disable CodeReuse/ActiveRecord
def fdw_find
fdw_all.joins("INNER JOIN job_artifact_registry ON job_artifact_registry.artifact_id = #{fdw_table}.id")
job_artifacts.joins("INNER JOIN job_artifact_registry ON job_artifact_registry.artifact_id = #{fdw_table}.id")
.geo_syncable
end
# rubocop: enable CodeReuse/ActiveRecord
# rubocop: disable CodeReuse/ActiveRecord
def fdw_find_unsynced(except_artifact_ids:)
fdw_all.joins("LEFT OUTER JOIN job_artifact_registry
job_artifacts.joins("LEFT OUTER JOIN job_artifact_registry
ON job_artifact_registry.artifact_id = #{fdw_table}.id")
.geo_syncable
.where(job_artifact_registry: { artifact_id: nil })
......@@ -163,23 +177,13 @@ module Geo
# rubocop: disable CodeReuse/ActiveRecord
def fdw_find_migrated_local(except_artifact_ids:)
fdw_all.joins("INNER JOIN job_artifact_registry ON job_artifact_registry.artifact_id = #{fdw_table}.id")
job_artifacts.joins("INNER JOIN job_artifact_registry ON job_artifact_registry.artifact_id = #{fdw_table}.id")
.with_files_stored_remotely
.where.not(id: except_artifact_ids)
.merge(Geo::JobArtifactRegistry.all)
end
# rubocop: enable CodeReuse/ActiveRecord
# rubocop: disable CodeReuse/ActiveRecord
def fdw_all
if selective_sync?
Geo::Fdw::Ci::JobArtifact.joins(:project).where(projects: { id: current_node.projects })
else
Geo::Fdw::Ci::JobArtifact.all
end
end
# rubocop: enable CodeReuse/ActiveRecord
def fdw_table
Geo::Fdw::Ci::JobArtifact.table_name
end
......@@ -225,7 +229,7 @@ module Geo
registry_artifact_ids = Geo::JobArtifactRegistry.pluck(:artifact_id) - except_artifact_ids
legacy_inner_join_registry_ids(
all.with_files_stored_remotely,
legacy_finder.job_artifacts.with_files_stored_remotely,
registry_artifact_ids,
Ci::JobArtifact
)
......
# frozen_string_literal: true
module Geo
class LegacyJobArtifactRegistryFinder < RegistryFinder
def syncable
job_artifacts.geo_syncable
end
def job_artifacts
if selective_sync?
Ci::JobArtifact.project_id_in(current_node.projects)
else
Ci::JobArtifact.all
end
end
end
end
......@@ -17,6 +17,7 @@ module EE
scope :not_expired, -> { where('expire_at IS NULL OR expire_at > ?', Time.current) }
scope :geo_syncable, -> { with_files_stored_locally.not_expired }
scope :project_id_in, ->(ids) { joins(:project).merge(::Project.id_in(ids)) }
scope :with_files_stored_remotely, -> { where(file_store: ::JobArtifactUploader::Store::REMOTE) }
scope :security_reports, -> do
......
......@@ -14,6 +14,7 @@ module Geo
scope :not_expired, -> { where('expire_at IS NULL OR expire_at > ?', Time.current) }
scope :geo_syncable, -> { with_files_stored_locally.not_expired }
scope :project_id_in, ->(ids) { joins(:project).merge(Geo::Fdw::Project.id_in(ids)) }
end
end
end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment