Commit e916e727 authored by Douwe Maan's avatar Douwe Maan

Merge branch 'increase-parallelism-of-geo-repository-sync-worker' into 'master'

Increase parallelism of geo repository sync worker

Closes #2742

See merge request !2351
parents 294d1804 c3aa2dee
...@@ -10,4 +10,20 @@ class Geo::ProjectRegistry < Geo::BaseRegistry ...@@ -10,4 +10,20 @@ class Geo::ProjectRegistry < Geo::BaseRegistry
where.not(last_repository_synced_at: nil, last_repository_successful_sync_at: nil) where.not(last_repository_synced_at: nil, last_repository_successful_sync_at: nil)
.where(resync_repository: false, resync_wiki: false) .where(resync_repository: false, resync_wiki: false)
end end
def resync_repository?
resync_repository || last_repository_successful_sync_at.nil?
end
def resync_wiki?
resync_wiki || last_wiki_successful_sync_at.nil?
end
def repository_synced_since?(timestamp)
last_repository_synced_at && last_repository_synced_at > timestamp
end
def wiki_synced_since?(timestamp)
last_wiki_synced_at && last_wiki_synced_at > timestamp
end
end end
module Geo
class BaseSyncService
class << self
attr_accessor :type
end
attr_reader :project
LEASE_TIMEOUT = 8.hours.freeze
LEASE_KEY_PREFIX = 'geo_sync_service'.freeze
def initialize(project)
@project = project
end
def execute
try_obtain_lease do
log("Started #{type} sync")
sync_repository
log("Finished #{type} sync")
end
end
def lease_key
@lease_key ||= "#{LEASE_KEY_PREFIX}:#{type}:#{project.id}"
end
private
def registry
@registry ||= Geo::ProjectRegistry.find_or_initialize_by(project_id: project.id)
end
def try_obtain_lease
log("Trying to obtain lease to sync #{type}")
repository_lease = Gitlab::ExclusiveLease.new(lease_key, timeout: LEASE_TIMEOUT).try_obtain
unless repository_lease
log("Could not obtain lease to sync #{type}")
return
end
yield
# We should release the lease for a repository, only if we have obtained
# it. If something went wrong when syncing the repository, we should wait
# for the lease timeout to try again.
log("Releasing leases to sync #{type}")
Gitlab::ExclusiveLease.cancel(lease_key, repository_lease)
end
def update_registry(type, started_at: nil, finished_at: nil)
return unless started_at || finished_at
log("Updating #{type} sync information")
attrs = {}
attrs["last_#{type}_synced_at"] = started_at if started_at
if finished_at
attrs["last_#{type}_successful_sync_at"] = finished_at
attrs["resync_#{type}"] = false
end
registry.update!(attrs)
end
def type
self.class.type
end
def primary_ssh_path_prefix
@primary_ssh_path_prefix ||= Gitlab::Geo.primary_node.clone_url_prefix
end
def log(message)
Rails.logger.info("#{self.class.name}: #{message} for project #{project.path_with_namespace} (#{project.id})")
end
end
end
module Geo module Geo
class RepositorySyncService class RepositorySyncService < BaseSyncService
attr_reader :project_id self.type = :repository
LEASE_TIMEOUT = 8.hours.freeze
LEASE_KEY_PREFIX = 'repository_sync_service'.freeze
def initialize(project_id)
@project_id = project_id
end
def execute
try_obtain_lease do
log('Started repository sync')
sync_project_repository
sync_wiki_repository
log('Finished repository sync')
end
rescue ActiveRecord::RecordNotFound
Rails.logger.error("#{self.class.name}: Couldn't find project with ID=#{project_id}, skipping syncing")
end
private private
def project def sync_repository
@project ||= Project.find(project_id) fetch_project_repository
end
def registry
@registry ||= Geo::ProjectRegistry.find_or_initialize_by(project_id: project_id)
end
def sync_project_repository
return unless sync_repository?
started_at, finished_at = fetch_project_repository
update_registry(:repository, started_at, finished_at)
expire_repository_caches expire_repository_caches
end end
def sync_repository?
registry.resync_repository? ||
registry.last_repository_successful_sync_at.nil? ||
registry.last_repository_synced_at.nil?
end
def sync_wiki_repository
return unless sync_wiki?
started_at, finished_at = fetch_wiki_repository
update_registry(:wiki, started_at, finished_at)
end
def sync_wiki?
registry.resync_wiki? ||
registry.last_wiki_successful_sync_at.nil? ||
registry.last_wiki_synced_at.nil?
end
def fetch_project_repository def fetch_project_repository
return unless sync_repository?
log('Fetching project repository') log('Fetching project repository')
started_at = DateTime.now update_registry(:repository, started_at: DateTime.now)
finished_at = nil
begin begin
project.ensure_repository project.ensure_repository
project.repository.fetch_geo_mirror(ssh_url_to_repo) project.repository.fetch_geo_mirror(ssh_url_to_repo)
finished_at = DateTime.now update_registry(:repository, finished_at: DateTime.now)
rescue Gitlab::Shell::Error => e rescue Gitlab::Shell::Error => e
Rails.logger.error("#{self.class.name}: Error syncing repository for project #{project.path_with_namespace}: #{e}") Rails.logger.error("#{self.class.name}: Error syncing repository for project #{project.path_with_namespace}: #{e}")
rescue Gitlab::Git::Repository::NoRepository => e rescue Gitlab::Git::Repository::NoRepository => e
...@@ -76,27 +25,6 @@ module Geo ...@@ -76,27 +25,6 @@ module Geo
log('Expiring caches') log('Expiring caches')
project.repository.after_create project.repository.after_create
end end
[started_at, finished_at]
end
def fetch_wiki_repository
return unless sync_wiki?
log('Fetching wiki repository')
started_at = DateTime.now
finished_at = nil
begin
project.wiki.ensure_repository
project.wiki.repository.fetch_geo_mirror(ssh_url_to_wiki)
finished_at = DateTime.now
rescue Gitlab::Git::Repository::NoRepository, Gitlab::Shell::Error, ProjectWiki::CouldNotCreateWikiError => e
Rails.logger.error("#{self.class.name}: Error syncing wiki repository for project #{project.path_with_namespace}: #{e}")
end
[started_at, finished_at]
end end
def expire_repository_caches def expire_repository_caches
...@@ -104,54 +32,8 @@ module Geo ...@@ -104,54 +32,8 @@ module Geo
project.repository.after_sync project.repository.after_sync
end end
def try_obtain_lease
log('Trying to obtain lease to sync repository')
repository_lease = Gitlab::ExclusiveLease.new(lease_key, timeout: LEASE_TIMEOUT).try_obtain
unless repository_lease
log('Could not obtain lease to sync repository')
return
end
yield
# We should release the lease for a repository, only if we have obtained
# it. If something went wrong when syncing the repository, we should wait
# for the lease timeout to try again.
log('Releasing leases to sync repository')
Gitlab::ExclusiveLease.cancel(lease_key, repository_lease)
end
def update_registry(type, started_at, finished_at)
log("Updating #{type} sync information")
registry.public_send("last_#{type}_synced_at=", started_at)
if finished_at
registry.public_send("last_#{type}_successful_sync_at=", finished_at)
registry.public_send("resync_#{type}=", false)
end
registry.save
end
def lease_key
@lease_key ||= "#{LEASE_KEY_PREFIX}:#{project.id}"
end
def primary_ssh_path_prefix
Gitlab::Geo.primary_node.clone_url_prefix
end
def ssh_url_to_repo def ssh_url_to_repo
"#{primary_ssh_path_prefix}#{project.path_with_namespace}.git" "#{primary_ssh_path_prefix}#{project.path_with_namespace}.git"
end end
def ssh_url_to_wiki
"#{primary_ssh_path_prefix}#{project.path_with_namespace}.wiki.git"
end
def log(message)
Rails.logger.info("#{self.class.name}: #{message} for project #{project.path_with_namespace} (#{project.id})")
end
end end
end end
module Geo
class WikiSyncService < BaseSyncService
self.type = :wiki
private
def sync_repository
fetch_wiki_repository
end
def fetch_wiki_repository
log('Fetching wiki repository')
update_registry(:wiki, started_at: DateTime.now)
begin
project.wiki.ensure_repository
project.wiki.repository.fetch_geo_mirror(ssh_url_to_wiki)
update_registry(:wiki, finished_at: DateTime.now)
rescue Gitlab::Git::Repository::NoRepository, Gitlab::Shell::Error, ProjectWiki::CouldNotCreateWikiError => e
Rails.logger.error("#{self.class.name}: Error syncing wiki repository for project #{project.path_with_namespace}: #{e}")
end
end
def ssh_url_to_wiki
"#{primary_ssh_path_prefix}#{project.path_with_namespace}.wiki.git"
end
end
end
module Geo
class ProjectSyncWorker
include Sidekiq::Worker
sidekiq_options queue: :geo, retry: 3, dead: false
sidekiq_retry_in { |count| 30 * count }
sidekiq_retries_exhausted do |msg, _|
Sidekiq.logger.warn "Failed #{msg['class']} with #{msg['args']}: #{msg['error_message']}"
end
def perform(project_id, scheduled_time)
project = Project.find(project_id)
registry = Geo::ProjectRegistry.find_or_initialize_by(project_id: project_id)
Geo::RepositorySyncService.new(project).execute if sync_repository?(registry, scheduled_time)
Geo::WikiSyncService.new(project).execute if sync_wiki?(registry, scheduled_time)
rescue ActiveRecord::RecordNotFound
logger.error("Couldn't find project with ID=#{project_id}, skipping syncing")
end
private
def sync_repository?(registry, scheduled_time)
!registry.repository_synced_since?(scheduled_time) &&
registry.resync_repository?
end
def sync_wiki?(registry, scheduled_time)
!registry.wiki_synced_since?(scheduled_time) &&
registry.resync_wiki?
end
end
end
...@@ -2,45 +2,74 @@ class GeoRepositorySyncWorker ...@@ -2,45 +2,74 @@ class GeoRepositorySyncWorker
include Sidekiq::Worker include Sidekiq::Worker
include CronjobQueue include CronjobQueue
RUN_TIME = 5.minutes.to_i LEASE_KEY = 'geo_repository_sync_worker'.freeze
BATCH_SIZE = 100 LEASE_TIMEOUT = 10.minutes
LAST_SYNC_INTERVAL = 24.hours BATCH_SIZE = 1000
BACKOFF_DELAY = 5.minutes
MAX_CAPACITY = 25
RUN_TIME = 60.minutes.to_i
def initialize
@pending_projects = []
@scheduled_jobs = []
end
def perform def perform
return unless Gitlab::Geo.secondary_role_enabled? return unless Gitlab::Geo.secondary_role_enabled?
return unless Gitlab::Geo.primary_node.present? return unless Gitlab::Geo.primary_node.present?
start_time = Time.now logger.info "Started Geo repository sync scheduler"
project_ids_not_synced = find_project_ids_not_synced
project_ids_updated_recently = find_project_ids_updated_recently
project_ids = interleave(project_ids_not_synced, project_ids_updated_recently)
logger.info "Started Geo repository syncing for #{project_ids.length} project(s)" @start_time = Time.now
project_ids.each do |project_id| # Prevent multiple Sidekiq workers from attempting to schedule projects synchronization
begin try_obtain_lease do
break if over_time?(start_time) loop do
break unless node_enabled? break unless node_enabled?
# We try to obtain a lease here for the entire sync process because we update_jobs_in_progress
# want to sync the repositories continuously at a controlled rate load_pending_projects if reload_queue?
# instead of hammering the primary node. Initially, we are syncing
# one repo at a time. If we don't obtain the lease here, every 5 # If we are still under the limit after refreshing our DB, we can end
# minutes all of 100 projects will be synced. # after scheduling the remaining transfers.
try_obtain_lease do |lease| last_batch = reload_queue?
Geo::RepositorySyncService.new(project_id).execute
end break if over_time?
rescue ActiveRecord::RecordNotFound break unless projects_remain?
logger.error("Couldn't find project with ID=#{project_id}, skipping syncing")
next schedule_jobs
break if last_batch
break unless renew_lease!
sleep(1)
end end
end
logger.info "Finished Geo repository syncing for #{project_ids.length} project(s)" logger.info "Finished Geo repository sync scheduler"
end
end end
private private
def reload_queue?
@pending_projects.size < MAX_CAPACITY
end
def projects_remain?
@pending_projects.size > 0
end
def over_time?
Time.now - @start_time >= RUN_TIME
end
def load_pending_projects
project_ids_not_synced = find_project_ids_not_synced
project_ids_updated_recently = find_project_ids_updated_recently
@pending_projects = interleave(project_ids_not_synced, project_ids_updated_recently)
end
def find_project_ids_not_synced def find_project_ids_not_synced
Project.where.not(id: Geo::ProjectRegistry.synced.pluck(:project_id)) Project.where.not(id: Geo::ProjectRegistry.synced.pluck(:project_id))
.order(last_repository_updated_at: :desc) .order(last_repository_updated_at: :desc)
...@@ -63,37 +92,65 @@ class GeoRepositorySyncWorker ...@@ -63,37 +92,65 @@ class GeoRepositorySyncWorker
end.flatten(1).uniq.compact.take(BATCH_SIZE) end.flatten(1).uniq.compact.take(BATCH_SIZE)
end end
def over_time?(start_time) def schedule_jobs
Time.now - start_time >= RUN_TIME num_to_schedule = [MAX_CAPACITY - scheduled_job_ids.size, @pending_projects.size].min
end return unless projects_remain?
def node_enabled? num_to_schedule.times do
# Only check every minute to avoid polling the DB excessively project_id = @pending_projects.shift
unless @last_enabled_check.present? && @last_enabled_check > 1.minute.ago job_id = Geo::ProjectSyncWorker.perform_in(BACKOFF_DELAY, project_id, Time.now)
@last_enabled_check = Time.now
@current_node_enabled = nil @scheduled_jobs << { id: project_id, job_id: job_id } if job_id
end end
end
@current_node_enabled ||= Gitlab::Geo.current_node_enabled? def scheduled_job_ids
@scheduled_jobs.map { |data| data[:job_id] }
end
def update_jobs_in_progress
status = Gitlab::SidekiqStatus.job_status(scheduled_job_ids)
# SidekiqStatus returns an array of booleans: true if the job has completed, false otherwise.
# For each entry, first use `zip` to make { job_id: 123, id: 10 } -> [ { job_id: 123, id: 10 }, bool ]
# Next, filter out the jobs that have completed.
@scheduled_jobs = @scheduled_jobs.zip(status).map { |(job, completed)| job if completed }.compact
end end
def try_obtain_lease def try_obtain_lease
lease = Gitlab::ExclusiveLease.new(lease_key, timeout: lease_timeout).try_obtain lease = exclusive_lease.try_obtain
return unless lease unless lease
logger.info "Cannot obtain an exclusive lease. There must be another worker already in execution."
return
end
begin begin
yield lease yield lease
ensure ensure
Gitlab::ExclusiveLease.cancel(lease_key, lease) release_lease(lease)
end end
end end
def lease_key def exclusive_lease
Geo::RepositorySyncService::LEASE_KEY_PREFIX @lease ||= Gitlab::ExclusiveLease.new(LEASE_KEY, timeout: LEASE_TIMEOUT)
end end
def lease_timeout def renew_lease!
Geo::RepositorySyncService::LEASE_TIMEOUT exclusive_lease.renew
end
def release_lease(uuid)
Gitlab::ExclusiveLease.cancel(LEASE_KEY, uuid)
end
def node_enabled?
# Only check every minute to avoid polling the DB excessively
unless @last_enabled_check.present? && @last_enabled_check > 1.minute.ago
@last_enabled_check = Time.now
@current_node_enabled = nil
end
@current_node_enabled ||= Gitlab::Geo.current_node_enabled?
end end
end end
---
title: Geo - Makes the projects synchronization faster on secondaries nodes
merge_request:
author:
...@@ -31,4 +31,86 @@ describe Geo::ProjectRegistry, models: true do ...@@ -31,4 +31,86 @@ describe Geo::ProjectRegistry, models: true do
expect(described_class.synced).to match_array([registry]) expect(described_class.synced).to match_array([registry])
end end
end end
describe '#resync_repository?' do
it 'returns true when resync_repository is true' do
subject.resync_repository = true
expect(subject.resync_repository).to be true
end
it 'returns true when last_repository_successful_sync_at is nil' do
subject.last_repository_successful_sync_at = nil
expect(subject.resync_repository).to be true
end
it 'returns false when resync_repository is false and last_repository_successful_sync_at is present' do
subject.resync_repository = false
subject.last_repository_successful_sync_at = Time.now
expect(subject.resync_repository).to be false
end
end
describe '#resync_wiki?' do
it 'returns true when resync_wiki is true' do
subject.resync_wiki = true
expect(subject.resync_wiki).to be true
end
it 'returns true when last_wiki_successful_sync_at is nil' do
subject.last_wiki_successful_sync_at = nil
expect(subject.resync_wiki).to be true
end
it 'returns false when resync_wiki is false and last_wiki_successful_sync_at is present' do
subject.resync_wiki = false
subject.last_wiki_successful_sync_at = Time.now
expect(subject.resync_wiki).to be false
end
end
describe '#repository_synced_since?' do
it 'returns false when last_repository_synced_at is nil' do
subject.last_repository_synced_at = nil
expect(subject.repository_synced_since?(Time.now)).to be_nil
end
it 'returns false when last_repository_synced_at before timestamp' do
subject.last_repository_synced_at = Time.now - 2.hours
expect(subject.repository_synced_since?(Time.now)).to be false
end
it 'returns true when last_repository_synced_at after timestamp' do
subject.last_repository_synced_at = Time.now + 2.hours
expect(subject.repository_synced_since?(Time.now)).to be true
end
end
describe '#wiki_synced_since?' do
it 'returns false when last_wiki_synced_at is nil' do
subject.last_wiki_synced_at = nil
expect(subject.wiki_synced_since?(Time.now)).to be_nil
end
it 'returns false when last_wiki_synced_at before timestamp' do
subject.last_wiki_synced_at = Time.now - 2.hours
expect(subject.wiki_synced_since?(Time.now)).to be false
end
it 'returns true when last_wiki_synced_at after timestamp' do
subject.last_wiki_synced_at = Time.now + 2.hours
expect(subject.wiki_synced_since?(Time.now)).to be true
end
end
end end
require 'spec_helper'
RSpec.describe Geo::WikiSyncService, services: true do
let!(:primary) { create(:geo_node, :primary, host: 'primary-geo-node') }
let(:lease) { double(try_obtain: true) }
subject { described_class.new(project) }
before do
allow(Gitlab::ExclusiveLease).to receive(:new)
.with(subject.lease_key, anything)
.and_return(lease)
allow_any_instance_of(Repository).to receive(:fetch_geo_mirror)
.and_return(true)
end
describe '#execute' do
let(:project) { create(:project_empty_repo) }
let(:repository) { project.wiki.repository }
let(:url_to_repo) { "#{primary.clone_url_prefix}#{project.path_with_namespace}.wiki.git" }
it 'fetches wiki repository' do
expect(repository).to receive(:fetch_geo_mirror).with(url_to_repo).once
subject.execute
end
it 'releases lease' do
expect(Gitlab::ExclusiveLease).to receive(:cancel).once.with(
subject.__send__(:lease_key), anything).and_call_original
subject.execute
end
it 'does not fetch wiki repository if cannot obtain a lease' do
allow(lease).to receive(:try_obtain) { false }
expect(repository).not_to receive(:fetch_geo_mirror)
subject.execute
end
it 'rescues exception when Gitlab::Shell::Error is raised' do
allow(repository).to receive(:fetch_geo_mirror).with(url_to_repo) { raise Gitlab::Shell::Error }
expect { subject.execute }.not_to raise_error
end
it 'rescues exception when Gitlab::Git::Repository::NoRepository is raised' do
allow(repository).to receive(:fetch_geo_mirror).with(url_to_repo) { raise Gitlab::Git::Repository::NoRepository }
expect { subject.execute }.not_to raise_error
end
context 'tracking database' do
it 'creates a new registry if does not exists' do
expect { subject.execute }.to change(Geo::ProjectRegistry, :count).by(1)
end
it 'does not create a new registry if one exists' do
create(:geo_project_registry, project: project)
expect { subject.execute }.not_to change(Geo::ProjectRegistry, :count)
end
context 'when repository sync succeed' do
let(:registry) { Geo::ProjectRegistry.find_by(project_id: project.id) }
before do
subject.execute
end
it 'sets last_wiki_synced_at' do
expect(registry.last_wiki_synced_at).not_to be_nil
end
it 'sets last_wiki_successful_sync_at' do
expect(registry.last_wiki_successful_sync_at).not_to be_nil
end
end
context 'when wiki sync fail' do
let(:registry) { Geo::ProjectRegistry.find_by(project_id: project.id) }
before do
allow(repository).to receive(:fetch_geo_mirror).with(url_to_repo) { raise Gitlab::Shell::Error }
subject.execute
end
it 'sets last_wiki_synced_at' do
expect(registry.last_wiki_synced_at).not_to be_nil
end
it 'resets last_wiki_successful_sync_at' do
expect(registry.last_wiki_successful_sync_at).to be_nil
end
end
end
end
end
require 'rails_helper'
RSpec.describe Geo::ProjectSyncWorker do
describe '#perform' do
let(:project) { create(:empty_project) }
let(:repository_sync_service) { spy }
let(:wiki_sync_service) { spy }
before do
allow(Geo::RepositorySyncService).to receive(:new)
.with(instance_of(Project)).once.and_return(repository_sync_service)
allow(Geo::WikiSyncService).to receive(:new)
.with(instance_of(Project)).once.and_return(wiki_sync_service)
end
context 'when project could not be found' do
it 'does not raise an error' do
expect { subject.perform(999, Time.now) }.not_to raise_error
end
end
context 'when project repositories has never been synced' do
it 'performs Geo::RepositorySyncService for the given project' do
subject.perform(project.id, Time.now)
expect(repository_sync_service).to have_received(:execute).once
end
it 'performs Geo::WikiSyncService for the given project' do
subject.perform(project.id, Time.now)
expect(wiki_sync_service).to have_received(:execute).once
end
end
context 'when project repositories has been synced' do
let!(:registry) { create(:geo_project_registry, :synced, project: project) }
it 'does not perform Geo::RepositorySyncService for the given project' do
subject.perform(project.id, Time.now)
expect(repository_sync_service).not_to have_received(:execute)
end
it 'does not perform Geo::WikiSyncService for the given project' do
subject.perform(project.id, Time.now)
expect(wiki_sync_service).not_to have_received(:execute)
end
end
context 'when last attempt to sync project repositories failed' do
let!(:registry) { create(:geo_project_registry, :sync_failed, project: project) }
it 'performs Geo::RepositorySyncService for the given project' do
subject.perform(project.id, Time.now)
expect(repository_sync_service).to have_received(:execute).once
end
it 'performs Geo::WikiSyncService for the given project' do
subject.perform(project.id, Time.now)
expect(wiki_sync_service).to have_received(:execute).once
end
end
context 'when project repository is dirty' do
let!(:registry) do
create(:geo_project_registry, :synced, :repository_dirty, project: project)
end
it 'performs Geo::RepositorySyncService for the given project' do
subject.perform(project.id, Time.now)
expect(repository_sync_service).to have_received(:execute).once
end
it 'does not perform Geo::WikiSyncService for the given project' do
subject.perform(project.id, Time.now)
expect(wiki_sync_service).not_to have_received(:execute)
end
end
context 'when wiki is dirty' do
let!(:registry) do
create(:geo_project_registry, :synced, :wiki_dirty, project: project)
end
it 'does not perform Geo::RepositorySyncService for the given project' do
subject.perform(project.id, Time.now)
expect(repository_sync_service).not_to have_received(:execute)
end
it 'performs Geo::WikiSyncService for the given project' do
subject.perform(project.id, Time.now)
expect(wiki_sync_service).to have_received(:execute)
end
end
context 'when project repository was synced after the time the job was scheduled in' do
it 'does not perform Geo::RepositorySyncService for the given project' do
create(:geo_project_registry, :synced, :repository_dirty, project: project, last_repository_synced_at: Time.now)
subject.perform(project.id, Time.now - 5.minutes)
expect(repository_sync_service).not_to have_received(:execute)
end
end
context 'when wiki repository was synced after the time the job was scheduled in' do
it 'does not perform Geo::RepositorySyncService for the given project' do
create(:geo_project_registry, :synced, :wiki_dirty, project: project, last_wiki_synced_at: Time.now)
subject.perform(project.id, Time.now - 5.minutes)
expect(wiki_sync_service).not_to have_received(:execute)
end
end
end
end
...@@ -13,90 +13,51 @@ describe GeoRepositorySyncWorker do ...@@ -13,90 +13,51 @@ describe GeoRepositorySyncWorker do
allow_any_instance_of(Gitlab::ExclusiveLease).to receive(:try_obtain) { true } allow_any_instance_of(Gitlab::ExclusiveLease).to receive(:try_obtain) { true }
end end
it 'performs Geo::RepositorySyncService for each project' do it 'performs Geo::ProjectSyncWorker for each project' do
expect(Geo::RepositorySyncService).to receive(:new).twice.and_return(spy) expect(Geo::ProjectSyncWorker).to receive(:perform_in).twice.and_return(spy)
subject.perform subject.perform
end end
it 'performs Geo::RepositorySyncService for projects where last attempt to sync failed' do it 'performs Geo::ProjectSyncWorker for projects where last attempt to sync failed' do
Geo::ProjectRegistry.create( create(:geo_project_registry, :sync_failed, project: project_1)
project: project_1, create(:geo_project_registry, :synced, project: project_2)
last_repository_synced_at: DateTime.now,
last_repository_successful_sync_at: nil
)
Geo::ProjectRegistry.create( expect(Geo::ProjectSyncWorker).to receive(:perform_in).once.and_return(spy)
project: project_2,
last_repository_synced_at: DateTime.now,
last_repository_successful_sync_at: DateTime.now,
resync_repository: false,
resync_wiki: false
)
expect(Geo::RepositorySyncService).to receive(:new).once.and_return(spy)
subject.perform subject.perform
end end
it 'performs Geo::RepositorySyncService for synced projects updated recently' do it 'performs Geo::ProjectSyncWorker for synced projects updated recently' do
Geo::ProjectRegistry.create( create(:geo_project_registry, :synced, :repository_dirty, project: project_1)
project: project_1, create(:geo_project_registry, :synced, project: project_2)
last_repository_synced_at: 2.days.ago, create(:geo_project_registry, :synced, :wiki_dirty)
last_repository_successful_sync_at: 2.days.ago,
resync_repository: true, expect(Geo::ProjectSyncWorker).to receive(:perform_in).twice.and_return(spy)
resync_wiki: false
)
Geo::ProjectRegistry.create(
project: project_2,
last_repository_synced_at: 10.minutes.ago,
last_repository_successful_sync_at: 10.minutes.ago,
resync_repository: false,
resync_wiki: false
)
Geo::ProjectRegistry.create(
project: create(:empty_project),
last_repository_synced_at: 5.minutes.ago,
last_repository_successful_sync_at: 5.minutes.ago,
resync_repository: false,
resync_wiki: true
)
expect(Geo::RepositorySyncService).to receive(:new).twice.and_return(spy)
subject.perform subject.perform
end end
it 'does not perform Geo::RepositorySyncService when secondary role is disabled' do it 'does not perform Geo::ProjectSyncWorker when secondary role is disabled' do
allow(Gitlab::Geo).to receive(:secondary_role_enabled?) { false } allow(Gitlab::Geo).to receive(:secondary_role_enabled?) { false }
expect(Geo::RepositorySyncService).not_to receive(:new) expect(Geo::ProjectSyncWorker).not_to receive(:perform_in)
subject.perform subject.perform
end end
it 'does not perform Geo::RepositorySyncService when primary node does not exists' do it 'does not perform Geo::ProjectSyncWorker when primary node does not exists' do
allow(Gitlab::Geo).to receive(:primary_node) { nil } allow(Gitlab::Geo).to receive(:primary_node) { nil }
expect(Geo::RepositorySyncService).not_to receive(:new) expect(Geo::ProjectSyncWorker).not_to receive(:perform_in)
subject.perform subject.perform
end end
it 'does not perform Geo::RepositorySyncService when node is disabled' do it 'does not perform Geo::ProjectSyncWorker when node is disabled' do
allow_any_instance_of(GeoNode).to receive(:enabled?) { false } allow_any_instance_of(GeoNode).to receive(:enabled?) { false }
expect(Geo::RepositorySyncService).not_to receive(:new) expect(Geo::ProjectSyncWorker).not_to receive(:perform_in)
subject.perform
end
it 'does not perform Geo::RepositorySyncService when can not obtain a lease' do
allow_any_instance_of(Gitlab::ExclusiveLease).to receive(:try_obtain) { false }
expect(Geo::RepositorySyncService).not_to receive(:new)
subject.perform subject.perform
end end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment