Use streaming instead of OFFSET/LIMIT

Streams the results one by one until there are no more
results to fetch. This is a way to improve the performance
of the query that returns the projects to handle large
GitLab instances.
parent 9cd06299
......@@ -10,7 +10,7 @@
# .execute.
module Geo
class ProjectUnsyncedFinder
def initialize(current_node:, shard_name:, batch_size:)
def initialize(current_node:, shard_name:, batch_size: nil)
@current_node = Geo::Fdw::GeoNode.find(current_node.id)
@shard_name = shard_name
@batch_size = batch_size
......@@ -20,10 +20,9 @@ module Geo
def execute
return Geo::Fdw::Project.none unless valid_shard?
projects
.missing_project_registry
.within_shards(shard_name)
.limit(batch_size)
relation = projects.missing_project_registry.within_shards(shard_name)
relation = relation.limit(batch_size) unless batch_size.nil?
relation
end
# rubocop:enable CodeReuse/ActiveRecord
......
......@@ -33,6 +33,7 @@
- geo:geo_project_sync
- geo:geo_container_repository_sync
- geo:geo_rename_repository
- geo:geo_secondary_repository_backfill
- geo:geo_repositories_clean_up
- geo:geo_repository_cleanup
- geo:geo_repository_destroy
......
......@@ -3,7 +3,8 @@
module Geo
class RepositorySyncWorker < Geo::Scheduler::Secondary::PerShardSchedulerWorker
def schedule_job(shard_name)
RepositoryShardSyncWorker.perform_async(shard_name)
# TODO: Put this behind a feature flag
Geo::Secondary::RepositoryBackfillWorker.perform_async(shard_name)
end
end
end
# frozen_string_literal: true
module Geo
module Secondary
class RepositoryBackfillWorker
include ApplicationWorker
include ExclusiveLeaseGuard
include GeoQueue
include ::Gitlab::Geo::LogHelpers
include ::Gitlab::Utils::StrongMemoize
LEASE_TIMEOUT = 60.minutes
RUN_TIME = 60.minutes.to_i
def initialize
@scheduled_jobs = []
end
def perform(shard_name)
@shard_name = shard_name
@start_time = Time.now.utc
@loops = 0
unless Gitlab::Geo.geo_database_configured?
log_info('Geo database not configured')
return
end
unless Gitlab::Geo.secondary?
log_info('Current node not a secondary')
return
end
unless Gitlab::ShardHealthCache.healthy_shard?(shard_name)
log_info("Shard (#{shard_name}) is not healthy")
return
end
try_obtain_lease do
log_info('Repository backfilling started')
reason = :unknown
begin
connection.send_query(unsynced_projects_ids.to_sql)
connection.set_single_row_mode
reason = loop do
break :node_disabled unless node_enabled?
break :over_time if over_time?
break :lease_lost unless renew_lease!
update_jobs_in_progress
# This will stream the results one by one
# until there are no more results to fetch.
result = connection.get_result or break
result.check
result.each do |row|
schedule_job(row['id'])
end
if over_capacity?
sleep(1)
end
end
rescue => error
reason = :error
log_error('Repository backfilling error', error)
raise error
ensure
log_info('Repository backfilling finished', total_loops: loops, duration: Time.now.utc - start_time, reason: reason)
end
end
end
private
attr_reader :shard_name, :start_time, :loops, :scheduled_jobs
def base_log_data(message)
super(message).merge(worker_metadata)
end
def lease_key
@lease_key ||= "#{self.class.name.underscore}:shard:#{shard_name}"
end
def lease_timeout
LEASE_TIMEOUT
end
def worker_metadata
{ shard: shard_name }
end
def connection
strong_memoize(:connection) do
Geo::TrackingBase.connection.raw_connection
end
end
# rubocop: disable CodeReuse/ActiveRecord
def schedule_job(project_id)
job_id = Geo::ProjectSyncWorker.perform_async(project_id, sync_repository: true, sync_wiki: true)
if job_id
@scheduled_jobs << { job_id: job_id }
log_info("Repository scheduled for backfilling", project_id: project_id, job_id: job_id)
else
log_info("Repository could not be scheduled for backfilling", project_id: project_id)
end
end
# rubocop: enable CodeReuse/ActiveRecord
# rubocop: disable CodeReuse/ActiveRecord
def unsynced_projects_ids
Geo::ProjectUnsyncedFinder
.new(current_node: Gitlab::Geo.current_node, shard_name: shard_name)
.execute
.reorder(last_repository_updated_at: :desc)
.select(:id)
end
# rubocop: enable CodeReuse/ActiveRecord
def update_jobs_in_progress
scheduled_job_ids = scheduled_jobs.map { |data| data[:job_id] }
status = Gitlab::SidekiqStatus.job_status(scheduled_job_ids)
# SidekiqStatus returns an array of booleans: true if the job is still running, false otherwise.
# For each entry, first use `zip` to make { job_id: 123 } -> [ { job_id: 123 }, bool ]
# Next, filter out the jobs that have completed.
@scheduled_jobs = @scheduled_jobs.zip(status).map { |(job, running)| job if running }.compact
end
def max_capacity
healthy_count = Gitlab::ShardHealthCache.healthy_shard_count
# If we don't have a count, that means that for some reason
# RepositorySyncWorker stopped running/updating the cache. We might
# be trying to shut down Geo while this job may still be running.
return 0 unless healthy_count.to_i > 0
capacity_per_shard = Gitlab::Geo.current_node.repos_max_capacity / healthy_count
[1, capacity_per_shard.to_i].max
end
def node_enabled?
# Only check every minute to avoid polling the DB excessively
unless @last_enabled_check.present? && @last_enabled_check > 1.minute.ago
@last_enabled_check = Time.now
clear_memoization(:current_node_enabled)
end
strong_memoize(:current_node_enabled) do
Gitlab::Geo.current_node_enabled?
end
end
def run_time
RUN_TIME
end
def over_capacity?
scheduled_jobs.size >= max_capacity
end
def over_time?
(Time.now.utc - start_time) >= run_time
end
end
end
end
......@@ -21,8 +21,26 @@ module EE
allow(::Gitlab::Geo).to receive(:secondary?).and_return(true)
end
def stub_node_disabled(node)
allow(node).to receive(:enabled?).and_return(false)
end
def stub_selective_sync(node, value)
allow(node).to receive(:selective_sync?).and_return(value)
end
def stub_healthy_shards(shards)
::Gitlab::ShardHealthCache.update(Array(shards))
end
def with_no_geo_database_configured(&block)
allow(::Gitlab::Geo).to receive(:geo_database_configured?).and_return(false)
yield
# We need to unstub here or the DatabaseCleaner will have issues since it
# will appear as though the tracking DB were not available
allow(::Gitlab::Geo).to receive(:geo_database_configured?).and_call_original
end
end
end
......@@ -167,7 +167,7 @@ describe Geo::RepositoryVerification::Secondary::ShardWorker, :geo, :geo_fdw, :r
end
it 'does not schedule jobs when not running on a secondary' do
allow(Gitlab::Geo).to receive(:primary?) { false }
allow(Gitlab::Geo).to receive(:secondary?) { false }
expect(secondary_single_worker).not_to receive(:perform_async)
......
# frozen_string_literal: true
require 'spec_helper'
describe Geo::Secondary::RepositoryBackfillWorker, :geo, :geo_fdw, :clean_gitlab_redis_cache do
include EE::GeoHelpers
let(:primary) { create(:geo_node, :primary) }
let(:secondary) { create(:geo_node) }
let(:shard_name) { Gitlab.config.repositories.storages.keys.first }
before do
stub_current_geo_node(secondary)
stub_shard_healthy(shard_name, true)
end
describe '#perform' do
it 'does not schedule jobs when Geo database is not configured' do
create(:project)
expect(Geo::ProjectSyncWorker).not_to receive(:perform_async)
with_no_geo_database_configured do
subject.perform(shard_name)
end
end
it 'does not schedule jobs when not running on a Geo secondary node' do
stub_current_geo_node(primary)
create(:project)
expect(Geo::ProjectSyncWorker).not_to receive(:perform_async)
subject.perform(shard_name)
end
it 'does not schedule jobs when shard is not healthy' do
stub_shard_healthy(shard_name, false)
create(:project)
expect(Geo::ProjectSyncWorker).not_to receive(:perform_async)
subject.perform(shard_name)
end
it 'does not schedule jobs when the Geo secondary node is disabled' do
stub_node_disabled(secondary)
create(:project)
expect(Geo::ProjectSyncWorker).not_to receive(:perform_async)
subject.perform(shard_name)
end
it 'does not schedule jobs for projects on other shards' do
project = create(:project)
project.update_column(:repository_storage, 'other')
expect(Geo::ProjectSyncWorker).not_to receive(:perform_async)
subject.perform(shard_name)
end
it 'schedules a job for each unsynced project' do
create_list(:project, 2)
expect(Geo::ProjectSyncWorker).to receive(:perform_async).twice.and_return(true)
subject.perform(shard_name)
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment