Commit 428fbf6e authored by Valery Sizov's avatar Valery Sizov

Geo: Handle race condition on design update

Using optimistic locking when finishing design sync
parent ca1b4b17
......@@ -46,16 +46,28 @@ class Geo::DesignRegistry < Geo::BaseRegistry
update!(attrs)
end
# TODO: This method has to use optimistic locking to update state
def finish_sync!(missing_on_primary = false)
update!(
state: :synced,
missing_on_primary: missing_on_primary,
retry_count: 0,
last_sync_failure: nil,
retry_at: nil,
force_to_redownload: false
)
mark_synced_atomically
end
def mark_synced_atomically
# We can only update registry if state is started.
# If state is set to pending that means that repository_updated! was called
# during the sync so we need to reschedule new sync
num_rows = self.class
.where(project_id: project_id)
.where(state: 'started')
.update_all(state: 'synced')
num_rows > 0
end
def should_be_redownloaded?
......
......@@ -57,11 +57,19 @@ module Geo
def mark_sync_as_successful(missing_on_primary: false)
log_info("Marking design sync as successful")
registry.finish_sync!(missing_on_primary)
persisted = registry.finish_sync!(missing_on_primary)
reschedule_sync unless persisted
log_info("Finished design sync", download_time_s: download_time_in_seconds)
end
def reschedule_sync
log_info("Reschedule design sync because a RepositoryUpdateEvent was processed during the sync")
::Geo::DesignRepositorySyncWorker.perform_async(project.id)
end
# rubocop: disable CodeReuse/ActiveRecord
def registry
@registry ||= Geo::DesignRegistry.find_or_initialize_by(project_id: project.id)
......
......@@ -33,6 +33,7 @@
- geo:geo_hashed_storage_migration
- geo:geo_project_sync
- geo:geo_container_repository_sync
- geo:geo_design_repository_sync
- geo:geo_rename_repository
- geo:geo_secondary_repository_backfill
- geo:geo_repositories_clean_up
......
# frozen_string_literal: true
module Geo
class DesignRepositorySyncWorker
include ApplicationWorker
include GeoQueue
include Gitlab::Geo::LogHelpers
sidekiq_options retry: 3, dead: false
sidekiq_retry_in { |count| 30 * count }
sidekiq_retries_exhausted do |msg, _|
Sidekiq.logger.warn "Failed #{msg['class']} with #{msg['args']}: #{msg['error_message']}"
end
def perform(project_id)
return if Feature.disabled?(:enable_geo_design_sync)
registry = Geo::DesignRegistry.find_or_initialize_by(project_id: project_id) # rubocop: disable CodeReuse/ActiveRecord
project = registry.project
if project.nil?
log_error("Couldn't find project, skipping syncing", project_id: project_id)
return
end
Geo::DesignRepositorySyncService.new(registry.project).execute
end
end
end
......@@ -14,9 +14,9 @@ describe Geo::DesignRegistry, :geo do
end
describe '#finish_sync!' do
it 'finishes registry record' do
design_registry = create(:geo_design_registry, :sync_started)
let(:design_registry) { create(:geo_design_registry, :sync_started) }
it 'finishes registry record' do
design_registry.finish_sync!
expect(design_registry.reload).to have_attributes(
......@@ -28,6 +28,35 @@ describe Geo::DesignRegistry, :geo do
force_to_redownload: false
)
end
context 'when a design sync was scheduled after the last sync began' do
before do
design_registry.update!(
state: 'pending',
retry_count: 2,
retry_at: 1.hour.ago,
force_to_redownload: true,
last_sync_failure: 'error',
missing_on_primary: true
)
design_registry.finish_sync!
end
it 'does not reset state' do
expect(design_registry.reload.state).to eq 'pending'
end
it 'resets the other sync state fields' do
expect(design_registry.reload).to have_attributes(
retry_count: 0,
retry_at: nil,
force_to_redownload: false,
last_sync_failure: nil,
missing_on_primary: false
)
end
end
end
describe '#should_be_redownloaded?' do
......
......@@ -137,4 +137,15 @@ describe Geo::DesignRepositorySyncService do
end
end
end
context 'race condition when RepositoryUpdatedEvent was processed during a sync' do
let(:registry) { subject.send(:registry) }
it 'reschedules the sync' do
expect(::Geo::DesignRepositorySyncWorker).to receive(:perform_async)
expect(registry).to receive(:finish_sync!).and_return(false)
subject.send(:mark_sync_as_successful)
end
end
end
# frozen_string_literal: true
require 'spec_helper'
describe Geo::DesignRepositorySyncWorker, :geo do
describe '#perform' do
it 'runs DesignRepositorySyncService' do
project = create(:project)
service = spy(:service)
expect(Geo::DesignRepositorySyncService).to receive(:new).with(project).and_return(service)
described_class.new.perform(project.id)
expect(service).to have_received(:execute)
end
it 'does not run DesignRepositorySyncService if feature is disabled' do
project = create(:project)
stub_feature_flags(enable_geo_design_sync: false)
expect(Geo::DesignRepositorySyncService).not_to receive(:new)
described_class.new.perform(project.id)
end
it 'logs error when repository does not exist' do
worker = described_class.new
expect(worker).to receive(:log_error)
.with("Couldn't find project, skipping syncing", project_id: 20)
expect(Geo::DesignRepositorySyncService).not_to receive(:new)
worker.perform(20)
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment