Extract a helper method to schedule jobs

parent 54ec4cae
...@@ -30,6 +30,22 @@ module Geo ...@@ -30,6 +30,22 @@ module Geo
end end
end end
def initialize
@scheduled_jobs = []
@loops = 0
end
def perform(shard_name)
@shard_name = shard_name
@start_time = Time.now.utc
return unless healthy_node?
try_obtain_lease do
schedule_jobs
end
end
private private
def base_log_data(message) def base_log_data(message)
......
...@@ -5,56 +5,6 @@ module Geo ...@@ -5,56 +5,6 @@ module Geo
class RepositoryBackfillWorker class RepositoryBackfillWorker
include Geo::Secondary::BackfillWorker include Geo::Secondary::BackfillWorker
def initialize
@scheduled_jobs = []
end
def perform(shard_name)
@shard_name = shard_name
@start_time = Time.now.utc
@loops = 0
return unless healthy_node?
try_obtain_lease do
log_info('Repository backfilling started')
reason = :unknown
begin
connection.send_query("#{projects_ids_unsynced.to_sql};#{project_ids_updated_recently.to_sql}")
connection.set_single_row_mode
reason = loop do
break :node_disabled unless node_enabled?
break :over_time if over_time?
break :lease_lost unless renew_lease!
update_jobs_in_progress
unless over_capacity?
# This will stream the results one by one
# until there are no more results to fetch.
result = connection.get_result
break :complete if result.nil?
result.check
result.each do |row|
schedule_job(row['id'])
end
else
sleep(1)
end
end
rescue => error
reason = :error
log_error('Repository backfilling error', error)
raise error
ensure
log_info('Repository backfilling finished', total_loops: loops, duration: Time.now.utc - start_time, reason: reason)
end
end
end
private private
attr_reader :scheduled_jobs attr_reader :scheduled_jobs
...@@ -83,6 +33,44 @@ module Geo ...@@ -83,6 +33,44 @@ module Geo
scheduled_jobs.size >= max_capacity scheduled_jobs.size >= max_capacity
end end
def schedule_jobs
log_info('Repository backfilling started')
reason = :unknown
begin
connection.send_query("#{projects_ids_unsynced.to_sql};#{project_ids_updated_recently.to_sql}")
connection.set_single_row_mode
reason = loop do
break :node_disabled unless node_enabled?
break :over_time if over_time?
break :lease_lost unless renew_lease!
update_jobs_in_progress
unless over_capacity?
# This will stream the results one by one
# until there are no more results to fetch.
result = connection.get_result
break :complete if result.nil?
result.check
result.each do |row|
schedule_job(row['id'])
end
else
sleep(1)
end
end
rescue => error
reason = :error
log_error('Repository backfilling error', error)
raise error
ensure
log_info('Repository backfilling finished', total_loops: loops, duration: Time.now.utc - start_time, reason: reason)
end
end
def schedule_job(project_id) def schedule_job(project_id)
job_id = Geo::ProjectSyncWorker.perform_async(project_id, sync_repository: true, sync_wiki: true) job_id = Geo::ProjectSyncWorker.perform_async(project_id, sync_repository: true, sync_wiki: true)
...@@ -94,6 +82,10 @@ module Geo ...@@ -94,6 +82,10 @@ module Geo
end end
end end
def scheduled_job_ids
scheduled_jobs.map { |data| data[:job_id] }
end
def update_jobs_in_progress def update_jobs_in_progress
job_ids = scheduled_job_ids job_ids = scheduled_job_ids
return if job_ids.empty? return if job_ids.empty?
...@@ -106,10 +98,6 @@ module Geo ...@@ -106,10 +98,6 @@ module Geo
end end
end end
def scheduled_job_ids
scheduled_jobs.map { |data| data[:job_id] }
end
# rubocop: disable CodeReuse/ActiveRecord # rubocop: disable CodeReuse/ActiveRecord
def projects_ids_unsynced def projects_ids_unsynced
Geo::ProjectUnsyncedFinder Geo::ProjectUnsyncedFinder
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment