Schedule a job projects where last sync failed or was updated recently

parent e21422f0
...@@ -10,7 +10,7 @@ ...@@ -10,7 +10,7 @@
# .execute. # .execute.
module Geo module Geo
class ProjectUpdatedRecentlyFinder class ProjectUpdatedRecentlyFinder
def initialize(current_node:, shard_name:, batch_size:) def initialize(current_node:, shard_name:, batch_size: nil)
@current_node = Geo::Fdw::GeoNode.find(current_node.id) @current_node = Geo::Fdw::GeoNode.find(current_node.id)
@shard_name = shard_name @shard_name = shard_name
@batch_size = batch_size @batch_size = batch_size
...@@ -20,10 +20,9 @@ module Geo ...@@ -20,10 +20,9 @@ module Geo
def execute def execute
return Geo::Fdw::Project.none unless valid_shard? return Geo::Fdw::Project.none unless valid_shard?
projects relation = projects.recently_updated.within_shards(shard_name)
.recently_updated relation = relation.limit(batch_size) unless batch_size.nil?
.within_shards(shard_name) relation
.limit(batch_size)
end end
# rubocop:enable CodeReuse/ActiveRecord # rubocop:enable CodeReuse/ActiveRecord
......
...@@ -21,7 +21,7 @@ module Geo ...@@ -21,7 +21,7 @@ module Geo
reason = :unknown reason = :unknown
begin begin
connection.send_query(unsynced_projects_ids.to_sql) connection.send_query("#{projects_ids_unsynced.to_sql};#{project_ids_updated_recently.to_sql}")
connection.set_single_row_mode connection.set_single_row_mode
reason = loop do reason = loop do
...@@ -106,7 +106,7 @@ module Geo ...@@ -106,7 +106,7 @@ module Geo
end end
# rubocop: disable CodeReuse/ActiveRecord # rubocop: disable CodeReuse/ActiveRecord
def unsynced_projects_ids def projects_ids_unsynced
Geo::ProjectUnsyncedFinder Geo::ProjectUnsyncedFinder
.new(current_node: Gitlab::Geo.current_node, shard_name: shard_name) .new(current_node: Gitlab::Geo.current_node, shard_name: shard_name)
.execute .execute
...@@ -114,6 +114,16 @@ module Geo ...@@ -114,6 +114,16 @@ module Geo
.select(:id) .select(:id)
end end
# rubocop: enable CodeReuse/ActiveRecord # rubocop: enable CodeReuse/ActiveRecord
# rubocop: disable CodeReuse/ActiveRecord
def project_ids_updated_recently
Geo::ProjectUpdatedRecentlyFinder
.new(current_node: Gitlab::Geo.current_node, shard_name: shard_name)
.execute
.order('project_registry.last_repository_synced_at ASC NULLS FIRST, projects.last_repository_updated_at ASC')
.select(:id)
end
# rubocop: enable CodeReuse/ActiveRecord
end end
end end
end end
...@@ -69,6 +69,25 @@ describe Geo::Secondary::RepositoryBackfillWorker, :geo, :geo_fdw, :clean_gitlab ...@@ -69,6 +69,25 @@ describe Geo::Secondary::RepositoryBackfillWorker, :geo, :geo_fdw, :clean_gitlab
subject.perform(shard_name) subject.perform(shard_name)
end end
it 'schedules a job for each project where last attempt to sync failed' do
create(:geo_project_registry, :sync_failed)
create(:geo_project_registry, :synced)
expect(Geo::ProjectSyncWorker).to receive(:perform_async).once.and_return(true)
subject.perform(shard_name)
end
it 'schedules a job for each synced project updated recently' do
create(:geo_project_registry, :synced, :repository_dirty)
create(:geo_project_registry, :synced)
create(:geo_project_registry, :synced, :wiki_dirty)
expect(Geo::ProjectSyncWorker).to receive(:perform_async).twice.and_return(true)
subject.perform(shard_name)
end
it 'respects Geo secondary node max capacity per shard' do it 'respects Geo secondary node max capacity per shard' do
stub_healthy_shards([shard_name, 'shard2', 'shard3', 'shard4', 'shard5']) stub_healthy_shards([shard_name, 'shard2', 'shard3', 'shard4', 'shard5'])
allow(Geo::ProjectSyncWorker).to receive(:perform_async).twice.and_return('jid-123') allow(Geo::ProjectSyncWorker).to receive(:perform_async).twice.and_return('jid-123')
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment