Add postgres_fdw support to find unsynced projects

parent ec6e396c
module Geo
class ProjectRegistryFinder
attr_reader :current_node
def initialize(current_node: nil)
@current_node = current_node
end
def find_unsynced_projects(batch_size:)
# Selective project replication adds a wrinkle to FDW queries, so
# we fallback to the legacy version for now.
relation =
if Gitlab::Geo.fdw? && !selective_sync
fdw_find_unsynced_projects
else
legacy_find_unsynced_projects
end
relation.limit(batch_size)
end
protected
def selective_sync
current_node.restricted_project_ids
end
#
# FDW accessors
#
def fdw_find_unsynced_projects
fdw_table = Geo::Fdw::Project.table_name
Geo::Fdw::Project.joins("LEFT OUTER JOIN project_registry ON project_registry.project_id = #{fdw_table}.id").where('project_registry.project_id IS NULL')
end
#
# Legacy accessors (non FDW)
#
def legacy_find_unsynced_projects
registry_project_ids = current_node.project_registries.pluck(:project_id)
return current_node.projects if registry_project_ids.empty?
joined_relation = current_node.projects.joins(<<~SQL)
LEFT OUTER JOIN
(VALUES #{registry_project_ids.map { |id| "(#{id}, 't')" }.join(',')})
project_registry(project_id, registry_present)
ON projects.id = project_registry.project_id
SQL
joined_relation.where(project_registry: { registry_present: [nil, false] })
end
end
end
module Geo
module Fdw
class Project < ::Geo::BaseFdw
self.table_name = Gitlab::Geo.fdw_table('projects')
end
end
end
...@@ -150,27 +150,6 @@ class GeoNode < ActiveRecord::Base ...@@ -150,27 +150,6 @@ class GeoNode < ActiveRecord::Base
end end
end end
# These are projects that meet the project restriction but haven't yet been
# synced (i.e., do not yet have a project registry entry).
#
# This query requires data from two different databases, and unavoidably
# plucks a list of project IDs from one into the other. This will not scale
# well with the number of synchronized projects - the query will increase
# linearly in size - so this should be replaced with postgres_fdw ASAP.
def unsynced_projects
registry_project_ids = project_registries.pluck(:project_id)
return projects if registry_project_ids.empty?
joined_relation = projects.joins(<<~SQL)
LEFT OUTER JOIN
(VALUES #{registry_project_ids.map { |id| "(#{id}, 't')" }.join(',')})
project_registry(project_id, registry_present)
ON projects.id = project_registry.project_id
SQL
joined_relation.where(project_registry: { registry_present: [nil, false] })
end
def uploads def uploads
if restricted_project_ids if restricted_project_ids
uploads_table = Upload.arel_table uploads_table = Upload.arel_table
......
...@@ -12,6 +12,10 @@ module Geo ...@@ -12,6 +12,10 @@ module Geo
{ id: project_id, job_id: job_id } if job_id { id: project_id, job_id: job_id } if job_id
end end
def finder
@finder ||= ProjectRegistryFinder.new(current_node: current_node)
end
def load_pending_resources def load_pending_resources
resources = find_project_ids_not_synced(batch_size: db_retrieve_batch_size) resources = find_project_ids_not_synced(batch_size: db_retrieve_batch_size)
remaining_capacity = db_retrieve_batch_size - resources.size remaining_capacity = db_retrieve_batch_size - resources.size
...@@ -24,9 +28,8 @@ module Geo ...@@ -24,9 +28,8 @@ module Geo
end end
def find_project_ids_not_synced(batch_size:) def find_project_ids_not_synced(batch_size:)
healthy_shards_restriction(current_node.unsynced_projects) healthy_shards_restriction(finder.find_unsynced_projects(batch_size: batch_size))
.reorder(last_repository_updated_at: :desc) .reorder(last_repository_updated_at: :desc)
.limit(batch_size)
.pluck(:id) .pluck(:id)
end end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment