Commit 83b31c0e authored by Stan Hu's avatar Stan Hu

Geo: Prepare for shard-specific repository sync worker

parent cc57f0c4
module Geo
class RepositoryShardSyncWorker < Geo::BaseSchedulerWorker
attr_accessor :shard_name
def initialize(shard_name)
@shard_name = shard_name
super()
end
private
def max_capacity
current_node.repos_max_capacity
end
def schedule_job(project_id)
job_id = Geo::ProjectSyncWorker.perform_async(project_id, Time.now)
{ id: project_id, job_id: job_id } if job_id
end
def finder
@finder ||= ProjectRegistryFinder.new(current_node: current_node)
end
def load_pending_resources
resources = find_project_ids_not_synced(batch_size: db_retrieve_batch_size)
remaining_capacity = db_retrieve_batch_size - resources.size
if remaining_capacity.zero?
resources
else
resources + find_project_ids_updated_recently(batch_size: remaining_capacity)
end
end
def find_project_ids_not_synced(batch_size:)
shard_restriction(finder.find_unsynced_projects(batch_size: batch_size))
.reorder(last_repository_updated_at: :desc)
.pluck(:id)
end
def find_project_ids_updated_recently(batch_size:)
shard_restriction(finder.find_projects_updated_recently(batch_size: batch_size))
.order(Gitlab::Database.nulls_first_order(:last_repository_updated_at, :desc))
.pluck(:id)
end
def shard_restriction(relation)
relation.where(repository_storage: shard_name)
end
end
end
module Geo
class RepositorySyncWorker < Geo::BaseSchedulerWorker
private
class RepositorySyncWorker
def perform
return unless Gitlab::Geo.geo_database_configured?
return unless Gitlab::Geo.secondary?
def max_capacity
current_node.repos_max_capacity
end
def schedule_job(project_id)
job_id = Geo::ProjectSyncWorker.perform_async(project_id, Time.now)
{ id: project_id, job_id: job_id } if job_id
end
shards = healthy_shards
def finder
@finder ||= ProjectRegistryFinder.new(current_node: current_node)
end
def load_pending_resources
resources = find_project_ids_not_synced(batch_size: db_retrieve_batch_size)
remaining_capacity = db_retrieve_batch_size - resources.size
if remaining_capacity.zero?
resources
else
resources + find_project_ids_updated_recently(batch_size: remaining_capacity)
end
shards.each do |shard_name|
RepositoryShardSyncWorker.perform_async(shard_name, Time.now)
end
def find_project_ids_not_synced(batch_size:)
healthy_shards_restriction(finder.find_unsynced_projects(batch_size: batch_size))
.reorder(last_repository_updated_at: :desc)
.pluck(:id)
end
def find_project_ids_updated_recently(batch_size:)
healthy_shards_restriction(finder.find_projects_updated_recently(batch_size: batch_size))
.order(Gitlab::Database.nulls_first_order(:last_repository_updated_at, :desc))
.pluck(:id)
end
def healthy_shards_restriction(relation)
configured = Gitlab.config.repositories.storages.keys
referenced = Project.distinct(:repository_storage).pluck(:repository_storage)
healthy = healthy_shards
known = configured | referenced
return relation if (known - healthy).empty?
relation.where(repository_storage: healthy)
end
def healthy_shards
......
......@@ -2,7 +2,7 @@ require 'spec_helper'
# Disable transactions via :truncate method because a foreign table
# can't see changes inside a transaction of a different connection.
describe Geo::RepositorySyncWorker, :geo, :truncate do
describe Geo::RepositoryShardSyncWorker, :geo, :truncate do
include ::EE::GeoHelpers
let!(:primary) { create(:geo_node, :primary) }
......@@ -11,7 +11,7 @@ describe Geo::RepositorySyncWorker, :geo, :truncate do
let!(:project_in_synced_group) { create(:project, group: synced_group) }
let!(:unsynced_project) { create(:project) }
subject { described_class.new }
subject { described_class.new(Gitlab.config.repositories.storages.keys.first) }
before do
stub_current_geo_node(secondary)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment