Commit 966b4e92 authored by Valery Sizov's avatar Valery Sizov

Handle repository and wiki in Geo::ProjectSyncWorker independently

Now when we need to initiate repository resync we spawn
ProjectSyncWorker where the need for the sync is checked again
just because we don't know for what exactly purpose this worker
has been spawn so we check the status of both: wiki and repo.
This is suboptimal. We also need this change to be flexible if we need
to spawn some sync that is schedule in the future.
parent e25b9c28
...@@ -223,11 +223,19 @@ class Geo::ProjectRegistry < Geo::BaseRegistry ...@@ -223,11 +223,19 @@ class Geo::ProjectRegistry < Geo::BaseRegistry
end end
def repository_sync_due?(scheduled_time) def repository_sync_due?(scheduled_time)
never_synced_repository? || repository_sync_needed?(scheduled_time) return true if last_repository_synced_at.nil?
return false unless resync_repository?
return false if repository_retry_at && scheduled_time < repository_retry_at
scheduled_time > last_repository_synced_at
end end
def wiki_sync_due?(scheduled_time) def wiki_sync_due?(scheduled_time)
never_synced_wiki? || wiki_sync_needed?(scheduled_time) return true if last_wiki_synced_at.nil?
return false unless resync_wiki?
return false if wiki_retry_at && scheduled_time < wiki_retry_at
scheduled_time > last_wiki_synced_at
end end
# Returns whether repository is pending verification check # Returns whether repository is pending verification check
...@@ -365,28 +373,6 @@ class Geo::ProjectRegistry < Geo::BaseRegistry ...@@ -365,28 +373,6 @@ class Geo::ProjectRegistry < Geo::BaseRegistry
"projects/#{project_id}/fetches_since_gc" "projects/#{project_id}/fetches_since_gc"
end end
def never_synced_repository?
last_repository_synced_at.nil?
end
def never_synced_wiki?
last_wiki_synced_at.nil?
end
def repository_sync_needed?(timestamp)
return false unless resync_repository?
return false if repository_retry_at && timestamp < repository_retry_at
last_repository_synced_at && timestamp > last_repository_synced_at
end
def wiki_sync_needed?(timestamp)
return false unless resync_wiki?
return false if wiki_retry_at && timestamp < wiki_retry_at
last_wiki_synced_at && timestamp > last_wiki_synced_at
end
# How many times have we retried syncing it? # How many times have we retried syncing it?
# #
# @param [String] type must be one of the values in TYPES # @param [String] type must be one of the values in TYPES
......
...@@ -158,7 +158,11 @@ module Geo ...@@ -158,7 +158,11 @@ module Geo
def reschedule_sync def reschedule_sync
log_info("Reschedule #{type} sync because a RepositoryUpdateEvent was processed during the sync") log_info("Reschedule #{type} sync because a RepositoryUpdateEvent was processed during the sync")
::Geo::ProjectSyncWorker.perform_async(project.id, Time.now) ::Geo::ProjectSyncWorker.perform_async(
project.id,
sync_repository: type.repository?,
sync_wiki: type.wiki?
)
end end
def fail_registry!(message, error, attrs = {}) def fail_registry!(message, error, attrs = {})
...@@ -170,7 +174,7 @@ module Geo ...@@ -170,7 +174,7 @@ module Geo
end end
def type def type
self.class.type @type ||= self.class.type.to_s.inquiry
end end
def update_delay_in_seconds def update_delay_in_seconds
......
...@@ -15,7 +15,7 @@ module Geo ...@@ -15,7 +15,7 @@ module Geo
end end
# rubocop: disable CodeReuse/ActiveRecord # rubocop: disable CodeReuse/ActiveRecord
def perform(project_id, scheduled_time) def perform(project_id, options = {})
registry = Geo::ProjectRegistry.find_or_initialize_by(project_id: project_id) registry = Geo::ProjectRegistry.find_or_initialize_by(project_id: project_id)
project = registry.project project = registry.project
...@@ -30,9 +30,37 @@ module Geo ...@@ -30,9 +30,37 @@ module Geo
return return
end end
Geo::RepositorySyncService.new(project).execute if registry.repository_sync_due?(scheduled_time) options = extract_options(registry, options)
Geo::WikiSyncService.new(project).execute if registry.wiki_sync_due?(scheduled_time)
sync_repository(registry, options)
sync_wiki(registry, options)
end end
# rubocop: enable CodeReuse/ActiveRecord # rubocop: enable CodeReuse/ActiveRecord
def sync_repository(registry, options)
return unless options[:sync_repository] && registry.resync_repository?
Geo::RepositorySyncService.new(registry.project).execute
end
def sync_wiki(registry, options)
return unless options[:sync_wiki] && registry.resync_wiki?
Geo::WikiSyncService.new(registry.project).execute
end
def extract_options(registry, options)
options.is_a?(Hash) ? options.symbolize_keys : backward_options(registry, options)
end
# Before GitLab 11.8 we used to pass the scheduled time instead of an options hash,
# this method makes the job arguments backward compatible and
# can be removed in any version after GitLab 12.0.
def backward_options(registry, schedule_time)
{
sync_repository: registry.repository_sync_due?(schedule_time),
sync_wiki: registry.wiki_sync_due?(schedule_time)
}
end
end end
end end
...@@ -42,11 +42,19 @@ module Geo ...@@ -42,11 +42,19 @@ module Geo
[1, capacity_per_shard.to_i].max [1, capacity_per_shard.to_i].max
end end
# rubocop: disable CodeReuse/ActiveRecord
def schedule_job(project_id) def schedule_job(project_id)
job_id = Geo::ProjectSyncWorker.perform_async(project_id, Time.now) registry = Geo::ProjectRegistry.find_or_initialize_by(project_id: project_id)
job_id = Geo::ProjectSyncWorker.perform_async(
project_id,
sync_repository: registry.repository_sync_due?(Time.now),
sync_wiki: registry.wiki_sync_due?(Time.now)
)
{ project_id: project_id, job_id: job_id } if job_id { project_id: project_id, job_id: job_id } if job_id
end end
# rubocop: enable CodeReuse/ActiveRecord
def scheduled_project_ids def scheduled_project_ids
scheduled_jobs.map { |data| data[:project_id] } scheduled_jobs.map { |data| data[:project_id] }
......
---
title: 'Geo: Handle repository and wiki sync separately in Geo::ProjectSyncWorker'
merge_request: 9360
author:
type: changed
...@@ -12,7 +12,7 @@ module Gitlab ...@@ -12,7 +12,7 @@ module Gitlab
registry.repository_created!(event) registry.repository_created!(event)
enqueue_job_if_shard_healthy(event) do enqueue_job_if_shard_healthy(event) do
::Geo::ProjectSyncWorker.perform_async(event.project_id, Time.now) ::Geo::ProjectSyncWorker.perform_async(event.project_id, sync_repository: true, sync_wiki: true)
end end
end end
......
...@@ -11,7 +11,11 @@ module Gitlab ...@@ -11,7 +11,11 @@ module Gitlab
registry.repository_updated!(event.source, scheduled_at) registry.repository_updated!(event.source, scheduled_at)
job_id = enqueue_job_if_shard_healthy(event) do job_id = enqueue_job_if_shard_healthy(event) do
::Geo::ProjectSyncWorker.perform_async(event.project_id, scheduled_at) ::Geo::ProjectSyncWorker.perform_async(
event.project_id,
sync_repository: event.repository?,
sync_wiki: event.wiki?
)
end end
log_event(job_id) log_event(job_id)
......
...@@ -21,7 +21,7 @@ describe Gitlab::Geo::LogCursor::Events::RepositoryUpdatedEvent, :postgresql, :c ...@@ -21,7 +21,7 @@ describe Gitlab::Geo::LogCursor::Events::RepositoryUpdatedEvent, :postgresql, :c
allow(Gitlab::ShardHealthCache).to receive(:healthy_shard?).with('broken').and_return(false) allow(Gitlab::ShardHealthCache).to receive(:healthy_shard?).with('broken').and_return(false)
end end
RSpec.shared_examples 'RepositoryUpdatedEvent' do shared_examples 'RepositoryUpdatedEvent' do
it 'creates a new project registry if it does not exist' do it 'creates a new project registry if it does not exist' do
expect { subject.process }.to change(Geo::ProjectRegistry, :count).by(1) expect { subject.process }.to change(Geo::ProjectRegistry, :count).by(1)
end end
...@@ -108,9 +108,37 @@ describe Gitlab::Geo::LogCursor::Events::RepositoryUpdatedEvent, :postgresql, :c ...@@ -108,9 +108,37 @@ describe Gitlab::Geo::LogCursor::Events::RepositoryUpdatedEvent, :postgresql, :c
it_behaves_like 'RepositoryUpdatedEvent' it_behaves_like 'RepositoryUpdatedEvent'
it 'schedules a Geo::ProjectSyncWorker' do it 'schedules a Geo::ProjectSyncWorker' do
expect(Geo::ProjectSyncWorker).to receive(:perform_async).with(project.id, now).once expect(Geo::ProjectSyncWorker).to receive(:perform_async).with(project.id, sync_repository: true, sync_wiki: false).once
Timecop.freeze(now) { subject.process } subject.process
end
context 'enqueues the job with the proper args' do
let!(:registry) { create(:geo_project_registry, :synced, project: repository_updated_event.project) }
before do
repository_updated_event.update!(source: event_source)
end
context 'enqueues wiki sync' do
let(:event_source) { Geo::RepositoryUpdatedEvent::WIKI }
it 'passes correct options' do
expect(Geo::ProjectSyncWorker).to receive(:perform_async).with(project.id, { sync_repository: false, sync_wiki: true })
subject.process
end
end
context 'enqueues repository sync' do
let(:event_source) { Geo::RepositoryUpdatedEvent::REPOSITORY }
it 'passes correct options' do
expect(Geo::ProjectSyncWorker).to receive(:perform_async).with(project.id, { sync_repository: true, sync_wiki: false })
subject.process
end
end
end end
end end
...@@ -120,9 +148,9 @@ describe Gitlab::Geo::LogCursor::Events::RepositoryUpdatedEvent, :postgresql, :c ...@@ -120,9 +148,9 @@ describe Gitlab::Geo::LogCursor::Events::RepositoryUpdatedEvent, :postgresql, :c
it_behaves_like 'RepositoryUpdatedEvent' it_behaves_like 'RepositoryUpdatedEvent'
it 'does not schedule a Geo::ProjectSyncWorker job' do it 'does not schedule a Geo::ProjectSyncWorker job' do
expect(Geo::ProjectSyncWorker).not_to receive(:perform_async).with(project.id, now) expect(Geo::ProjectSyncWorker).not_to receive(:perform_async).with(project.id, anything)
Timecop.freeze(now) { subject.process } subject.process
end end
end end
end end
......
...@@ -21,11 +21,20 @@ RSpec.describe Geo::ProjectSyncWorker do ...@@ -21,11 +21,20 @@ RSpec.describe Geo::ProjectSyncWorker do
.with(instance_of(Project)).once.and_return(wiki_sync_service) .with(instance_of(Project)).once.and_return(wiki_sync_service)
end end
context 'backward compatibility' do
it 'performs sync for the given project when time is passed' do
subject.perform(project.id, Time.now)
expect(repository_sync_service).to have_received(:execute)
expect(wiki_sync_service).to have_received(:execute)
end
end
context 'when project could not be found' do context 'when project could not be found' do
it 'logs an error and returns' do it 'logs an error and returns' do
expect(subject).to receive(:log_error).with("Couldn't find project, skipping syncing", project_id: 999) expect(subject).to receive(:log_error).with("Couldn't find project, skipping syncing", project_id: 999)
expect { subject.perform(999, Time.now) }.not_to raise_error expect { subject.perform(999) }.not_to raise_error
end end
end end
...@@ -35,21 +44,23 @@ RSpec.describe Geo::ProjectSyncWorker do ...@@ -35,21 +44,23 @@ RSpec.describe Geo::ProjectSyncWorker do
expect(repository_sync_service).not_to receive(:execute) expect(repository_sync_service).not_to receive(:execute)
expect(wiki_sync_service).not_to receive(:execute) expect(wiki_sync_service).not_to receive(:execute)
subject.perform(project_with_broken_storage.id, Time.now) subject.perform(project_with_broken_storage.id)
end end
end end
context 'when project repositories has never been synced' do context 'when project repositories has never been synced' do
it 'performs Geo::RepositorySyncService for the given project' do it 'performs Geo::RepositorySyncService for the given project' do
subject.perform(project.id, Time.now) subject.perform(project.id, sync_repository: true)
expect(repository_sync_service).to have_received(:execute).once expect(repository_sync_service).to have_received(:execute).once
expect(wiki_sync_service).not_to have_received(:execute)
end end
it 'performs Geo::WikiSyncService for the given project' do it 'performs Geo::WikiSyncService for the given project' do
subject.perform(project.id, Time.now) subject.perform(project.id, sync_wiki: true)
expect(wiki_sync_service).to have_received(:execute).once expect(wiki_sync_service).to have_received(:execute).once
expect(repository_sync_service).not_to have_received(:execute)
end end
end end
...@@ -57,13 +68,13 @@ RSpec.describe Geo::ProjectSyncWorker do ...@@ -57,13 +68,13 @@ RSpec.describe Geo::ProjectSyncWorker do
let!(:registry) { create(:geo_project_registry, :synced, project: project) } let!(:registry) { create(:geo_project_registry, :synced, project: project) }
it 'does not perform Geo::RepositorySyncService for the given project' do it 'does not perform Geo::RepositorySyncService for the given project' do
subject.perform(project.id, Time.now) subject.perform(project.id, sync_repository: true)
expect(repository_sync_service).not_to have_received(:execute) expect(repository_sync_service).not_to have_received(:execute)
end end
it 'does not perform Geo::WikiSyncService for the given project' do it 'does not perform Geo::WikiSyncService for the given project' do
subject.perform(project.id, Time.now) subject.perform(project.id, sync_wiki: true)
expect(wiki_sync_service).not_to have_received(:execute) expect(wiki_sync_service).not_to have_received(:execute)
end end
...@@ -73,72 +84,16 @@ RSpec.describe Geo::ProjectSyncWorker do ...@@ -73,72 +84,16 @@ RSpec.describe Geo::ProjectSyncWorker do
let!(:registry) { create(:geo_project_registry, :sync_failed, project: project) } let!(:registry) { create(:geo_project_registry, :sync_failed, project: project) }
it 'performs Geo::RepositorySyncService for the given project' do it 'performs Geo::RepositorySyncService for the given project' do
subject.perform(project.id, Time.now) subject.perform(project.id, sync_repository: true)
expect(repository_sync_service).to have_received(:execute).once expect(repository_sync_service).to have_received(:execute).once
end end
it 'performs Geo::WikiSyncService for the given project' do it 'performs Geo::WikiSyncService for the given project' do
subject.perform(project.id, Time.now) subject.perform(project.id, sync_wiki: true)
expect(wiki_sync_service).to have_received(:execute).once expect(wiki_sync_service).to have_received(:execute).once
end end
end end
context 'when project repository is dirty' do
let!(:registry) do
create(:geo_project_registry, :synced, :repository_dirty, project: project)
end
it 'performs Geo::RepositorySyncService for the given project' do
subject.perform(project.id, Time.now)
expect(repository_sync_service).to have_received(:execute).once
end
it 'does not perform Geo::WikiSyncService for the given project' do
subject.perform(project.id, Time.now)
expect(wiki_sync_service).not_to have_received(:execute)
end
end
context 'when wiki is dirty' do
let!(:registry) do
create(:geo_project_registry, :synced, :wiki_dirty, project: project)
end
it 'does not perform Geo::RepositorySyncService for the given project' do
subject.perform(project.id, Time.now)
expect(repository_sync_service).not_to have_received(:execute)
end
it 'performs Geo::WikiSyncService for the given project' do
subject.perform(project.id, Time.now)
expect(wiki_sync_service).to have_received(:execute)
end
end
context 'when project repository was synced after the time the job was scheduled in' do
it 'does not perform Geo::RepositorySyncService for the given project' do
create(:geo_project_registry, :synced, :repository_dirty, project: project, last_repository_synced_at: Time.now)
subject.perform(project.id, Time.now - 5.minutes)
expect(repository_sync_service).not_to have_received(:execute)
end
end
context 'when wiki repository was synced after the time the job was scheduled in' do
it 'does not perform Geo::RepositorySyncService for the given project' do
create(:geo_project_registry, :synced, :wiki_dirty, project: project, last_wiki_synced_at: Time.now)
subject.perform(project.id, Time.now - 5.minutes)
expect(wiki_sync_service).not_to have_received(:execute)
end
end
end end
end end
...@@ -122,7 +122,7 @@ describe Geo::RepositoryShardSyncWorker, :geo, :delete, :clean_gitlab_redis_cach ...@@ -122,7 +122,7 @@ describe Geo::RepositoryShardSyncWorker, :geo, :delete, :clean_gitlab_redis_cach
it 'does not perform Geo::ProjectSyncWorker for projects that do not belong to selected namespaces to replicate' do it 'does not perform Geo::ProjectSyncWorker for projects that do not belong to selected namespaces to replicate' do
expect(Geo::ProjectSyncWorker).to receive(:perform_async) expect(Geo::ProjectSyncWorker).to receive(:perform_async)
.with(unsynced_project_in_restricted_group.id, within(1.minute).of(Time.now)) .with(unsynced_project_in_restricted_group.id, sync_repository: true, sync_wiki: true)
.once .once
.and_return(spy) .and_return(spy)
...@@ -134,7 +134,7 @@ describe Geo::RepositoryShardSyncWorker, :geo, :delete, :clean_gitlab_redis_cach ...@@ -134,7 +134,7 @@ describe Geo::RepositoryShardSyncWorker, :geo, :delete, :clean_gitlab_redis_cach
create(:geo_project_registry, :synced, :repository_dirty, project: unsynced_project) create(:geo_project_registry, :synced, :repository_dirty, project: unsynced_project)
expect(Geo::ProjectSyncWorker).to receive(:perform_async) expect(Geo::ProjectSyncWorker).to receive(:perform_async)
.with(unsynced_project_in_restricted_group.id, within(1.minute).of(Time.now)) .with(unsynced_project_in_restricted_group.id, sync_repository: true, sync_wiki: false)
.once .once
.and_return(spy) .and_return(spy)
...@@ -182,6 +182,32 @@ describe Geo::RepositoryShardSyncWorker, :geo, :delete, :clean_gitlab_redis_cach ...@@ -182,6 +182,32 @@ describe Geo::RepositoryShardSyncWorker, :geo, :delete, :clean_gitlab_redis_cach
end end
end end
context 'projects that require resync' do
context 'when project repository is dirty' do
it 'syncs repository only' do
create(:geo_project_registry, :synced, :repository_dirty, project: unsynced_project)
create(:geo_project_registry, :synced, :repository_dirty, project: unsynced_project_in_restricted_group)
expect(Geo::ProjectSyncWorker).to receive(:perform_async).with(unsynced_project.id, sync_repository: true, sync_wiki: false)
expect(Geo::ProjectSyncWorker).to receive(:perform_async).with(unsynced_project_in_restricted_group.id, sync_repository: true, sync_wiki: false)
subject.perform(shard_name)
end
end
context 'when project wiki is dirty' do
it 'syncs wiki only' do
create(:geo_project_registry, :synced, :wiki_dirty, project: unsynced_project)
create(:geo_project_registry, :synced, :wiki_dirty, project: unsynced_project_in_restricted_group)
expect(Geo::ProjectSyncWorker).to receive(:perform_async).with(unsynced_project.id, sync_repository: false, sync_wiki: true)
expect(Geo::ProjectSyncWorker).to receive(:perform_async).with(unsynced_project_in_restricted_group.id, sync_repository: false, sync_wiki: true)
subject.perform(shard_name)
end
end
end
context 'all repositories fail' do context 'all repositories fail' do
let!(:project_list) { create_list(:project, 4, :random_last_repository_updated_at) } let!(:project_list) { create_list(:project, 4, :random_last_repository_updated_at) }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment