Commit cdfb06dc authored by Valery Sizov's avatar Valery Sizov

Geo: Handle race condition on containers syncs

Using optimistic locking when finishing container sync
parent 3e23689a
---
title: 'Geo: Fix race condition for container synchronization'
merge_request: 17823
author:
type: fixed
......@@ -20,12 +20,6 @@ class Geo::ContainerRepositoryRegistry < Geo::BaseRegistry
registry.last_synced_at = Time.now
end
before_transition any => :synced do |registry, _|
registry.retry_count = 0
registry.retry_at = nil
registry.last_sync_failure = nil
end
before_transition any => :pending do |registry, _|
registry.retry_at = 0
registry.retry_count = 0
......@@ -35,10 +29,6 @@ class Geo::ContainerRepositoryRegistry < Geo::BaseRegistry
transition [:synced, :failed, :pending] => :started
end
event :finish_sync! do
transition started: :synced
end
event :repository_updated! do
transition [:synced, :failed, :started] => :pending
end
......@@ -58,4 +48,26 @@ class Geo::ContainerRepositoryRegistry < Geo::BaseRegistry
retry_at: next_retry_time(new_retry_count)
)
end
def finish_sync!
update!(
retry_count: 0,
last_sync_failure: nil,
retry_at: nil
)
mark_synced_atomically
end
def mark_synced_atomically
# We can only update registry if state is started.
# If state is set to pending that means that repository_updated! was called
# during the sync so we need to reschedule new sync
num_rows = self.class
.where(container_repository_id: container_repository_id)
.where(state: 'started')
.update_all(state: 'synced')
num_rows > 0
end
end
......@@ -4,11 +4,8 @@ require 'tempfile'
module Geo
class ContainerRepositorySync
include ExclusiveLeaseGuard
include Gitlab::Utils::StrongMemoize
LEASE_TIMEOUT = 1.hour.freeze
attr_reader :name, :container_repository
def initialize(container_repository)
......@@ -17,19 +14,15 @@ module Geo
end
def execute
try_obtain_lease do
# It makes sense to do this sequentially because in most cases images
# share some layers so it can save IO ops.
tags_to_sync.each do |tag|
sync_tag(tag[:name])
end
tags_to_remove.each do |tag|
container_repository.delete_tag_by_digest(tag[:digest])
end
tags_to_sync.each do |tag|
sync_tag(tag[:name])
end
true
tags_to_remove.each do |tag|
container_repository.delete_tag_by_digest(tag[:digest])
end
true
end
private
......@@ -87,14 +80,6 @@ module Geo
secondary_tags - primary_tags
end
def lease_key
@lease_key ||= "#{self.class.name}:#{name}"
end
def lease_timeout
LEASE_TIMEOUT
end
# The client for primary registry
def client
strong_memoize(:client) do
......
......@@ -2,8 +2,12 @@
module Geo
class ContainerRepositorySyncService
include ExclusiveLeaseGuard
include ::Gitlab::Geo::ContainerRepositoryLogHelpers
LEASE_TIMEOUT = 8.hours.freeze
LEASE_KEY = 'geo_container_sync'
attr_reader :container_repository
def initialize(container_repository)
......@@ -11,12 +15,19 @@ module Geo
end
def execute
try_obtain_lease do
sync_repository
end
end
def sync_repository
log_info('Marking sync as started')
registry.start_sync!
Geo::ContainerRepositorySync.new(container_repository).execute
registry.finish_sync!
mark_sync_as_successful
log_info('Finished sync')
rescue => e
fail_registry_sync!("Container repository sync failed", e)
......@@ -24,12 +35,32 @@ module Geo
private
def mark_sync_as_successful
persisted = registry.finish_sync!
reschedule_sync unless persisted
end
def reschedule_sync
log_info("Reschedule container sync because a ContainerRepositoryUpdatedEvent was processed during the sync")
Geo::ContainerRepositorySyncWorker.perform_async(container_repository.id)
end
def fail_registry_sync!(message, error)
log_error(message, error)
registry.fail_sync!(message, error)
end
def lease_key
@lease_key ||= "#{LEASE_KEY}:#{container_repository.id}"
end
def lease_timeout
LEASE_TIMEOUT
end
# rubocop: disable CodeReuse/ActiveRecord
def registry
@registry ||= begin
......
......@@ -14,8 +14,10 @@ module Geo
Sidekiq.logger.warn "Failed #{msg['class']} with #{msg['args']}: #{msg['error_message']}"
end
attr_reader :repository
def perform(id)
repository = ContainerRepository.find_by_id(id)
@repository = ContainerRepository.find_by_id(id)
if repository.nil?
log_error("Couldn't find container repository, skipping syncing", container_repository_id: id)
......
......@@ -3,7 +3,7 @@
require 'spec_helper'
describe Geo::ContainerRepositoryRegistry, :geo do
set(:container_repository_registry) { create(:container_repository_registry) }
set(:registry) { create(:container_repository_registry) }
describe 'relationships' do
it { is_expected.to belong_to(:container_repository) }
......@@ -20,7 +20,7 @@ describe Geo::ContainerRepositoryRegistry, :geo do
result = described_class.repository_id_not_in([container_repository1_id, container_repository2_id])
expect(result).to match_ids([container_repository_registry])
expect(result).to match_ids([registry])
end
end
end
......@@ -30,17 +30,42 @@ describe Geo::ContainerRepositoryRegistry, :geo do
end
describe '#finish_sync!' do
it 'finishes registry record' do
container_repository_registry = create(:container_repository_registry, :sync_started)
let(:registry) { create(:container_repository_registry, :sync_started) }
container_repository_registry.finish_sync!
it 'finishes registry record' do
registry.finish_sync!
expect(container_repository_registry.reload).to have_attributes(
expect(registry.reload).to have_attributes(
retry_count: 0,
retry_at: nil,
last_sync_failure: nil,
state: 'synced'
)
end
context 'when a container sync was scheduled after the last sync began' do
before do
registry.update!(
state: 'pending',
retry_count: 2,
retry_at: 1.hour.ago,
last_sync_failure: 'error'
)
registry.finish_sync!
end
it 'does not reset state' do
expect(registry.reload.state).to eq 'pending'
end
it 'resets the other sync state fields' do
expect(registry.reload).to have_attributes(
retry_count: 0,
retry_at: nil,
last_sync_failure: nil
)
end
end
end
end
......@@ -8,34 +8,70 @@ describe Geo::ContainerRepositorySyncService, :geo do
set(:secondary) { create(:geo_node) }
let(:registry) { create(:container_repository_registry, :sync_started) }
let(:container_repository) { registry.container_repository }
let(:lease_key) { "#{Geo::ContainerRepositorySyncService::LEASE_KEY}:#{container_repository.id}" }
let(:lease_uuid) { 'uuid'}
subject { described_class.new(container_repository) }
before do
stub_current_geo_node(secondary)
end
describe '#execute' do
let(:container_repository_registry) { create(:container_repository_registry, :sync_started) }
context 'lease handling' do
before do
stub_exclusive_lease(lease_key, lease_uuid)
end
it 'returns the lease when sync succeeds' do
registry
expect_to_cancel_exclusive_lease(lease_key, lease_uuid)
allow_any_instance_of(Geo::ContainerRepositorySync).to receive(:execute)
subject.execute
end
it 'returns the lease when sync fails' do
allow_any_instance_of(Geo::ContainerRepositorySync).to receive(:execute)
.and_raise(StandardError)
expect_to_cancel_exclusive_lease(lease_key, lease_uuid)
subject.execute
end
it 'skips syncing repositories if cannot obtain a lease' do
stub_exclusive_lease_taken(lease_key)
expect_any_instance_of(Geo::ContainerRepositorySync).not_to receive(:execute)
subject.execute
end
end
describe '#execute' do
it 'fails registry record if there was exception' do
allow_any_instance_of(Geo::ContainerRepositorySync)
.to receive(:execute).and_raise 'Sync Error'
described_class.new(container_repository_registry.container_repository).execute
described_class.new(registry.container_repository).execute
expect(container_repository_registry.reload.failed?).to be_truthy
expect(registry.reload.failed?).to be_truthy
end
it 'finishes registry record if there was no exception' do
expect_any_instance_of(Geo::ContainerRepositorySync)
.to receive(:execute)
described_class.new(container_repository_registry.container_repository).execute
described_class.new(registry.container_repository).execute
expect(container_repository_registry.reload.synced?).to be_truthy
expect(registry.reload.synced?).to be_truthy
end
it 'finishes registry record if there was no exception and registy does not exist' do
container_repository = create(:container_repository)
expect_any_instance_of(Geo::ContainerRepositorySync)
.to receive(:execute)
......@@ -46,4 +82,15 @@ describe Geo::ContainerRepositorySyncService, :geo do
expect(registry.synced?).to be_truthy
end
end
context 'race condition when ContainerRepositoryUpdatedEvent was processed during a sync' do
it 'reschedules the sync' do
allow_any_instance_of(described_class).to receive(:registry).and_return(registry)
expect(::Geo::ContainerRepositorySyncWorker).to receive(:perform_async)
expect(registry).to receive(:finish_sync!).and_return(false)
described_class.new(registry.container_repository).send(:mark_sync_as_successful)
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment