Commit 127c60a0 authored by Gabriel Mazetto's avatar Gabriel Mazetto

Refactor project registry to ensure valid `type`

`#repository_updated!` receives `type` instead of `event
parent 52e03bc3
...@@ -77,7 +77,12 @@ class Geo::ProjectRegistry < Geo::BaseRegistry ...@@ -77,7 +77,12 @@ class Geo::ProjectRegistry < Geo::BaseRegistry
end end
# Must be run before fetching the repository to avoid a race condition # Must be run before fetching the repository to avoid a race condition
#
# @param [String] type must be one of the values in TYPES
# @see REGISTRY_TYPES
def start_sync!(type) def start_sync!(type)
ensure_valid_type!(type)
new_count = retry_count(type) + 1 new_count = retry_count(type) + 1
update!( update!(
...@@ -86,7 +91,12 @@ class Geo::ProjectRegistry < Geo::BaseRegistry ...@@ -86,7 +91,12 @@ class Geo::ProjectRegistry < Geo::BaseRegistry
"#{type}_retry_at" => next_retry_time(new_count)) "#{type}_retry_at" => next_retry_time(new_count))
end end
# Is called when synchronization finishes without any issue
#
# @param [String] type must be one of the values in TYPES
# @see REGISTRY_TYPES
def finish_sync!(type, missing_on_primary = false) def finish_sync!(type, missing_on_primary = false)
ensure_valid_type!(type)
update!( update!(
# Indicate that the sync succeeded (but separately mark as synced atomically) # Indicate that the sync succeeded (but separately mark as synced atomically)
"last_#{type}_successful_sync_at" => Time.now, "last_#{type}_successful_sync_at" => Time.now,
...@@ -104,7 +114,16 @@ class Geo::ProjectRegistry < Geo::BaseRegistry ...@@ -104,7 +114,16 @@ class Geo::ProjectRegistry < Geo::BaseRegistry
mark_synced_atomically(type) mark_synced_atomically(type)
end end
# Is called when synchronization fails with an exception
#
# @param [String] type must be one of the values in TYPES
# @param [String] message with a human readable description of the failure
# @param [Exception] error the exception
# @param [Hash] attrs attributes to update the database with
# @see REGISTRY_TYPES
def fail_sync!(type, message, error, attrs = {}) def fail_sync!(type, message, error, attrs = {})
ensure_valid_type!(type)
attrs["resync_#{type}"] = true attrs["resync_#{type}"] = true
attrs["last_#{type}_sync_failure"] = "#{message}: #{error.message}" attrs["last_#{type}_sync_failure"] = "#{message}: #{error.message}"
attrs["#{type}_retry_count"] = retry_count(type) + 1 attrs["#{type}_retry_count"] = retry_count(type) + 1
...@@ -120,9 +139,13 @@ class Geo::ProjectRegistry < Geo::BaseRegistry ...@@ -120,9 +139,13 @@ class Geo::ProjectRegistry < Geo::BaseRegistry
# Marks the project as dirty. # Marks the project as dirty.
# #
# resync_#{type}_was_scheduled_at tracks scheduled_at to avoid a race condition. # resync_#{type}_was_scheduled_at tracks scheduled_at to avoid a race condition.
# See the method #mark_synced_atomically. # @see #mark_synced_atomically
def repository_updated!(repository_updated_event, scheduled_at) #
type = repository_updated_event.source # @param [String] type must be one of the values in TYPES
# @param [Time] scheduled_at when it was scheduled
# @see REGISTRY_TYPES
def repository_updated!(type, scheduled_at)
ensure_valid_type!(type)
update!( update!(
"resync_#{type}" => true, "resync_#{type}" => true,
...@@ -162,7 +185,12 @@ class Geo::ProjectRegistry < Geo::BaseRegistry ...@@ -162,7 +185,12 @@ class Geo::ProjectRegistry < Geo::BaseRegistry
Gitlab::Redis::SharedState.with { |redis| redis.set(fetches_since_gc_redis_key, value) } Gitlab::Redis::SharedState.with { |redis| redis.set(fetches_since_gc_redis_key, value) }
end end
# Check if we should re-download *type*
#
# @param [String] type must be one of the values in TYPES
# @see REGISTRY_TYPES
def should_be_redownloaded?(type) def should_be_redownloaded?(type)
ensure_valid_type!(type)
return true if public_send("force_to_redownload_#{type}") # rubocop:disable GitlabSecurity/PublicSend return true if public_send("force_to_redownload_#{type}") # rubocop:disable GitlabSecurity/PublicSend
retry_count(type) > RETRIES_BEFORE_REDOWNLOAD retry_count(type) > RETRIES_BEFORE_REDOWNLOAD
...@@ -200,11 +228,19 @@ class Geo::ProjectRegistry < Geo::BaseRegistry ...@@ -200,11 +228,19 @@ class Geo::ProjectRegistry < Geo::BaseRegistry
last_wiki_synced_at && timestamp > last_wiki_synced_at last_wiki_synced_at && timestamp > last_wiki_synced_at
end end
# How many times have we retried syncing it?
#
# @param [String] type must be one of the values in TYPES
# @see REGISTRY_TYPES
def retry_count(type) def retry_count(type)
public_send("#{type}_retry_count") || -1 # rubocop:disable GitlabSecurity/PublicSend public_send("#{type}_retry_count") || -1 # rubocop:disable GitlabSecurity/PublicSend
end end
# Mark repository as synced using atomic conditions
#
# @return [Boolean] whether the update was successful # @return [Boolean] whether the update was successful
# @param [String] type must be one of the values in TYPES
# @see REGISTRY_TYPES
def mark_synced_atomically(type) def mark_synced_atomically(type)
# Indicates whether the project is dirty (needs to be synced). # Indicates whether the project is dirty (needs to be synced).
# #
...@@ -233,4 +269,12 @@ class Geo::ProjectRegistry < Geo::BaseRegistry ...@@ -233,4 +269,12 @@ class Geo::ProjectRegistry < Geo::BaseRegistry
num_rows > 0 num_rows > 0
end end
# Make sure informed type is one of the allowed values
#
# @param [String] type must be one of the values in TYPES otherwise it will fail
# @see REGISTRY_TYPES
def ensure_valid_type!(type)
raise ArgumentError, "Invalid type: '#{type.inspect}' informed. Must be one of the following: #{REGISTRY_TYPES.map { |type| "'#{type}'" }.join(', ')}" unless REGISTRY_TYPES.include?(type.to_sym)
end
end end
...@@ -6,7 +6,7 @@ module Gitlab ...@@ -6,7 +6,7 @@ module Gitlab
include BaseEvent include BaseEvent
def process def process
registry.repository_updated!(event, scheduled_at) registry.repository_updated!(event.source, scheduled_at)
job_id = enqueue_job_if_shard_healthy(event) do job_id = enqueue_job_if_shard_healthy(event) do
::Geo::ProjectSyncWorker.perform_async(event.project_id, scheduled_at) ::Geo::ProjectSyncWorker.perform_async(event.project_id, scheduled_at)
......
...@@ -699,7 +699,7 @@ describe Geo::ProjectRegistry do ...@@ -699,7 +699,7 @@ describe Geo::ProjectRegistry do
repository_retry_count: 1, repository_retry_count: 1,
repository_verification_retry_count: 1) repository_verification_retry_count: 1)
subject.repository_updated!(event, Time.now) subject.repository_updated!(event.source, Time.now)
end end
it 'resets sync state' do it 'resets sync state' do
...@@ -737,7 +737,7 @@ describe Geo::ProjectRegistry do ...@@ -737,7 +737,7 @@ describe Geo::ProjectRegistry do
wiki_retry_count: 1, wiki_retry_count: 1,
wiki_verification_retry_count: 1) wiki_verification_retry_count: 1)
subject.repository_updated!(event, Time.now) subject.repository_updated!(event.source, Time.now)
end end
it 'resets sync state' do it 'resets sync state' do
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment