Commit c9c99f3a authored by Douglas Barbosa Alexandre's avatar Douglas Barbosa Alexandre

Merge branch 'geo-logcursor-exit-if-failing-too-long' into 'master'

Geo: Exit LogCursor if it has been failing for too long

Closes #14627 and #14944

See merge request gitlab-org/gitlab-ee!16408
parents 5fbbcbca b044f855
---
title: 'Geo: Exit LogCursor if it has been failing for too long'
merge_request: 16408
author:
type: added
...@@ -7,37 +7,42 @@ module Gitlab ...@@ -7,37 +7,42 @@ module Gitlab
VERSION = '0.2.0'.freeze VERSION = '0.2.0'.freeze
BATCH_SIZE = 250 BATCH_SIZE = 250
SECONDARY_CHECK_INTERVAL = 60 SECONDARY_CHECK_INTERVAL = 60
MAX_ERROR_DURATION = 1800
attr_reader :options attr_reader :options
def initialize(options = {}) def initialize(options = {})
@options = options @options = options
@exit = false @exit = false
@failing_since = nil
end end
def run! def run!
logger.debug('#run!: start') logger.debug('#run!: start')
trap_signals trap_signals
until exit? run_once! until exit?
# Prevent the node from processing events unless it's a secondary
unless Geo.secondary?
logger.debug("#run!: not a secondary, sleeping for #{SECONDARY_CHECK_INTERVAL} secs")
sleep_break(SECONDARY_CHECK_INTERVAL)
next
end
lease = Lease.try_obtain_with_ttl { run_once! } logger.debug('#run!: finish')
return if exit? end
# When no new event is found sleep for a few moments def run_once!
arbitrary_sleep(lease[:ttl]) # Prevent the node from processing events unless it's a secondary
unless Geo.secondary?
logger.debug("#run!: not a secondary, sleeping for #{SECONDARY_CHECK_INTERVAL} secs")
sleep_break(SECONDARY_CHECK_INTERVAL)
return
end end
logger.debug('#run!: finish') lease = Lease.try_obtain_with_ttl { find_and_handle_events! }
handle_error(lease[:error])
# When no new event is found sleep for a few moments
sleep_break(lease[:ttl])
end end
def run_once! def find_and_handle_events!
gap_tracking.fill_gaps { |event_log| handle_single_event(event_log) } gap_tracking.fill_gaps { |event_log| handle_single_event(event_log) }
# Wrap this with the connection to make it possible to reconnect if # Wrap this with the connection to make it possible to reconnect if
...@@ -49,14 +54,28 @@ module Gitlab ...@@ -49,14 +54,28 @@ module Gitlab
private private
def sleep_break(seconds) def handle_error(error)
while seconds > 0 track_failing_since(error)
sleep(1)
seconds -= 1 if excessive_errors?
break if exit? exit!("Consecutive errors for over #{MAX_ERROR_DURATION} seconds")
end end
end end
def track_failing_since(error)
if error
@failing_since ||= Time.now.utc
else
@failing_since = nil
end
end
def excessive_errors?
return unless @failing_since
(Time.now.utc - @failing_since) > MAX_ERROR_DURATION
end
def handle_events(batch, previous_batch_last_id) def handle_events(batch, previous_batch_last_id)
logger.info("#handle_events:", first_id: batch.first.id, last_id: batch.last.id) logger.info("#handle_events:", first_id: batch.first.id, last_id: batch.last.id)
...@@ -112,6 +131,12 @@ module Gitlab ...@@ -112,6 +131,12 @@ module Gitlab
@exit = true @exit = true
end end
def exit!(error_message)
logger.error("Exiting due to: #{error_message}") if error_message
@exit = true
end
def exit? def exit?
@exit @exit
end end
...@@ -127,12 +152,25 @@ module Gitlab ...@@ -127,12 +152,25 @@ module Gitlab
end end
# rubocop: enable CodeReuse/ActiveRecord # rubocop: enable CodeReuse/ActiveRecord
# Sleeps for the expired TTL that remains on the lease plus some random seconds. # Sleeps for the specified duration plus some random seconds.
# #
# This allows multiple GeoLogCursors to randomly process a batch of events, # This allows multiple GeoLogCursors to randomly process a batch of events,
# without favouring the shortest path (or latency). # without favouring the shortest path (or latency).
def arbitrary_sleep(delay) #
sleep(delay + rand(1..20) * 0.1) # Exits early if needed.
def sleep_break(seconds)
sleep(random_jitter_time)
seconds.to_i.times do
break if exit?
sleep(1)
end
end
# Returns a random float from 0.1 to 2.0
def random_jitter_time
rand(1..20) * 0.1
end end
def gap_tracking def gap_tracking
......
...@@ -43,7 +43,7 @@ module Gitlab ...@@ -43,7 +43,7 @@ module Gitlab
Gitlab::ExclusiveLease.cancel(LEASE_KEY, lease[:uuid]) Gitlab::ExclusiveLease.cancel(LEASE_KEY, lease[:uuid])
{ uuid: false, ttl: LEASE_TIMEOUT } { uuid: false, ttl: LEASE_TIMEOUT, error: true }
end end
end end
......
...@@ -22,21 +22,26 @@ describe Gitlab::Geo::LogCursor::Daemon, :clean_gitlab_redis_shared_state do ...@@ -22,21 +22,26 @@ describe Gitlab::Geo::LogCursor::Daemon, :clean_gitlab_redis_shared_state do
allow(daemon).to receive(:arbitrary_sleep).and_return(0.1) allow(daemon).to receive(:arbitrary_sleep).and_return(0.1)
end end
# Warning: Ensure an exit condition for the main run! loop, or RSpec will not
# stop without an interrupt. You can use `ensure_exit_on` to specify the exact
# number of calls to `exit?`, with the last call returning `true`.
describe '#run!' do describe '#run!' do
it 'traps signals' do it 'traps signals' do
is_expected.to receive(:exit?).and_return(true) ensure_exit_on(1)
is_expected.to receive(:trap_signals) is_expected.to receive(:trap_signals)
daemon.run! daemon.run!
end end
it 'delegates to #run_once! in a loop' do it 'delegates to #run_once! in a loop' do
is_expected.to receive(:exit?).and_return(false, false, false, true) ensure_exit_on(3)
is_expected.to receive(:run_once!).twice is_expected.to receive(:run_once!).twice
daemon.run! daemon.run!
end end
end
describe '#run_once!' do
it 'skips execution if cannot achieve a lease' do it 'skips execution if cannot achieve a lease' do
lease = stub_exclusive_lease_taken('geo_log_cursor_processed') lease = stub_exclusive_lease_taken('geo_log_cursor_processed')
...@@ -44,34 +49,68 @@ describe Gitlab::Geo::LogCursor::Daemon, :clean_gitlab_redis_shared_state do ...@@ -44,34 +49,68 @@ describe Gitlab::Geo::LogCursor::Daemon, :clean_gitlab_redis_shared_state do
allow(lease).to receive(:same_uuid?).and_return(false) allow(lease).to receive(:same_uuid?).and_return(false)
allow(Gitlab::Geo::LogCursor::Lease).to receive(:exclusive_lease).and_return(lease) allow(Gitlab::Geo::LogCursor::Lease).to receive(:exclusive_lease).and_return(lease)
is_expected.to receive(:exit?).and_return(false, true) is_expected.not_to receive(:find_and_handle_events!)
is_expected.not_to receive(:run_once!)
daemon.run! daemon.run_once!
end end
it 'skips execution if not a Geo node' do it 'skips execution if not a Geo node' do
stub_current_geo_node(nil) stub_current_geo_node(nil)
is_expected.to receive(:exit?).and_return(false, true)
is_expected.to receive(:sleep_break).with(1.minute) is_expected.to receive(:sleep_break).with(1.minute)
is_expected.not_to receive(:run_once!) is_expected.not_to receive(:find_and_handle_events!)
daemon.run! daemon.run_once!
end end
it 'skips execution if the current node is a primary' do it 'skips execution if the current node is a primary' do
stub_current_geo_node(primary) stub_current_geo_node(primary)
is_expected.to receive(:exit?).and_return(false, true)
is_expected.to receive(:sleep_break).with(1.minute) is_expected.to receive(:sleep_break).with(1.minute)
is_expected.not_to receive(:run_once!) is_expected.not_to receive(:find_and_handle_events!)
daemon.run! daemon.run_once!
end
context 'when the lease block rescues an error' do
context 'when this error is the final straw' do
it 'calls `#exit!`' do
is_expected.to receive(:exit!)
is_expected.to receive(:find_and_handle_events!).and_raise('any error').twice
Timecop.freeze do
daemon.run_once!
Timecop.travel(described_class::MAX_ERROR_DURATION + 1.second) do
daemon.run_once!
end
end
end
end
context 'when this error is not the final straw' do
it 'does not call `#exit!`' do
is_expected.not_to receive(:exit!)
Timecop.freeze do
is_expected.to receive(:find_and_handle_events!).and_raise('any error')
daemon.run_once!
Timecop.travel(described_class::MAX_ERROR_DURATION + 1.second) do
is_expected.to receive(:find_and_handle_events!) # successful
daemon.run_once!
is_expected.to receive(:find_and_handle_events!).and_raise('any error')
daemon.run_once!
end
end
end
end
end end
end end
describe '#run_once!' do describe '#find_and_handle_events!' do
context 'with some event logs' do context 'with some event logs' do
let(:project) { create(:project) } let(:project) { create(:project) }
let(:repository_updated_event) { create(:geo_repository_updated_event, project: project) } let(:repository_updated_event) { create(:geo_repository_updated_event, project: project) }
...@@ -82,7 +121,7 @@ describe Gitlab::Geo::LogCursor::Daemon, :clean_gitlab_redis_shared_state do ...@@ -82,7 +121,7 @@ describe Gitlab::Geo::LogCursor::Daemon, :clean_gitlab_redis_shared_state do
it 'handles events' do it 'handles events' do
expect(daemon).to receive(:handle_events).with(batch, anything) expect(daemon).to receive(:handle_events).with(batch, anything)
daemon.run_once! daemon.find_and_handle_events!
end end
it 'calls #handle_gap_event for each gap the gap tracking finds' do it 'calls #handle_gap_event for each gap the gap tracking finds' do
...@@ -94,7 +133,7 @@ describe Gitlab::Geo::LogCursor::Daemon, :clean_gitlab_redis_shared_state do ...@@ -94,7 +133,7 @@ describe Gitlab::Geo::LogCursor::Daemon, :clean_gitlab_redis_shared_state do
expect(daemon).to receive(:handle_single_event).with(event_log) expect(daemon).to receive(:handle_single_event).with(event_log)
expect(daemon).to receive(:handle_single_event).with(second_event_log) expect(daemon).to receive(:handle_single_event).with(second_event_log)
daemon.run_once! daemon.find_and_handle_events!
end end
end end
...@@ -117,7 +156,7 @@ describe Gitlab::Geo::LogCursor::Daemon, :clean_gitlab_redis_shared_state do ...@@ -117,7 +156,7 @@ describe Gitlab::Geo::LogCursor::Daemon, :clean_gitlab_redis_shared_state do
expect(Geo::ProjectSyncWorker).to receive(:perform_async).with(project.id, anything).once expect(Geo::ProjectSyncWorker).to receive(:perform_async).with(project.id, anything).once
daemon.run_once! daemon.find_and_handle_events!
end end
it 'does not replay events for projects that do not belong to selected namespaces to replicate' do it 'does not replay events for projects that do not belong to selected namespaces to replicate' do
...@@ -125,14 +164,14 @@ describe Gitlab::Geo::LogCursor::Daemon, :clean_gitlab_redis_shared_state do ...@@ -125,14 +164,14 @@ describe Gitlab::Geo::LogCursor::Daemon, :clean_gitlab_redis_shared_state do
expect(Geo::ProjectSyncWorker).not_to receive(:perform_async).with(project.id, anything) expect(Geo::ProjectSyncWorker).not_to receive(:perform_async).with(project.id, anything)
daemon.run_once! daemon.find_and_handle_events!
end end
it 'detects when an event was skipped' do it 'detects when an event was skipped' do
updated_event = create(:geo_repository_updated_event, project: project) updated_event = create(:geo_repository_updated_event, project: project)
new_event = create(:geo_event_log, id: event_log.id + 2, repository_updated_event: updated_event) new_event = create(:geo_event_log, id: event_log.id + 2, repository_updated_event: updated_event)
daemon.run_once! daemon.find_and_handle_events!
create(:geo_event_log, id: event_log.id + 1) create(:geo_event_log, id: event_log.id + 1)
...@@ -145,11 +184,11 @@ describe Gitlab::Geo::LogCursor::Daemon, :clean_gitlab_redis_shared_state do ...@@ -145,11 +184,11 @@ describe Gitlab::Geo::LogCursor::Daemon, :clean_gitlab_redis_shared_state do
updated_event = create(:geo_repository_updated_event, project: project) updated_event = create(:geo_repository_updated_event, project: project)
new_event = create(:geo_event_log, repository_updated_event: updated_event) new_event = create(:geo_event_log, repository_updated_event: updated_event)
daemon.run_once! daemon.find_and_handle_events!
create(:geo_event_log, id: new_event.id + 3, repository_updated_event: updated_event) create(:geo_event_log, id: new_event.id + 3, repository_updated_event: updated_event)
daemon.run_once! daemon.find_and_handle_events!
create(:geo_event_log, id: new_event.id + 1, repository_updated_event: updated_event) create(:geo_event_log, id: new_event.id + 1, repository_updated_event: updated_event)
create(:geo_event_log, id: new_event.id + 2, repository_updated_event: updated_event) create(:geo_event_log, id: new_event.id + 2, repository_updated_event: updated_event)
...@@ -166,7 +205,7 @@ describe Gitlab::Geo::LogCursor::Daemon, :clean_gitlab_redis_shared_state do ...@@ -166,7 +205,7 @@ describe Gitlab::Geo::LogCursor::Daemon, :clean_gitlab_redis_shared_state do
message: '#handle_single_event: unknown event', message: '#handle_single_event: unknown event',
event_log_id: new_event.id)) event_log_id: new_event.id))
daemon.run_once! daemon.find_and_handle_events!
expect(::Geo::EventLogState.last_processed.id).to eq(new_event.id) expect(::Geo::EventLogState.last_processed.id).to eq(new_event.id)
end end
...@@ -184,7 +223,7 @@ describe Gitlab::Geo::LogCursor::Daemon, :clean_gitlab_redis_shared_state do ...@@ -184,7 +223,7 @@ describe Gitlab::Geo::LogCursor::Daemon, :clean_gitlab_redis_shared_state do
event_type: 'Geo::RepositoryUpdatedEvent', event_type: 'Geo::RepositoryUpdatedEvent',
project_id: project.id)) project_id: project.id))
daemon.run_once! daemon.find_and_handle_events!
end end
it 'does not replay events for projects that do not belong to selected shards to replicate' do it 'does not replay events for projects that do not belong to selected shards to replicate' do
...@@ -192,7 +231,7 @@ describe Gitlab::Geo::LogCursor::Daemon, :clean_gitlab_redis_shared_state do ...@@ -192,7 +231,7 @@ describe Gitlab::Geo::LogCursor::Daemon, :clean_gitlab_redis_shared_state do
expect(Geo::ProjectSyncWorker).not_to receive(:perform_async).with(project.id, anything) expect(Geo::ProjectSyncWorker).not_to receive(:perform_async).with(project.id, anything)
daemon.run_once! daemon.find_and_handle_events!
end end
end end
end end
...@@ -259,4 +298,20 @@ describe Gitlab::Geo::LogCursor::Daemon, :clean_gitlab_redis_shared_state do ...@@ -259,4 +298,20 @@ describe Gitlab::Geo::LogCursor::Daemon, :clean_gitlab_redis_shared_state do
gaps gaps
end end
# It is extremely easy to get run! into an infinite loop.
#
# Regardless of `allow` or `expect`, this method ensures that the loop will
# exit at the specified number of exit? calls.
def ensure_exit_on(num_calls = 3, expect = true)
# E.g. If num_calls is `3`, returns is set to `[false, false, true]`.
returns = Array.new(num_calls) { false }
returns[-1] = true
if expect
expect(daemon).to receive(:exit?).and_return(*returns)
else
allow(daemon).to receive(:exit?).and_return(*returns)
end
end
end end
...@@ -33,7 +33,7 @@ describe Gitlab::Geo::LogCursor::Lease, :clean_gitlab_redis_shared_state do ...@@ -33,7 +33,7 @@ describe Gitlab::Geo::LogCursor::Lease, :clean_gitlab_redis_shared_state do
end end
end end
describe '.try_obtain_lease_with_ttl' do describe '.try_obtain_with_ttl' do
it 'returns zero when there is no lease' do it 'returns zero when there is no lease' do
result = described_class.try_obtain_with_ttl {} result = described_class.try_obtain_with_ttl {}
...@@ -78,6 +78,7 @@ describe Gitlab::Geo::LogCursor::Lease, :clean_gitlab_redis_shared_state do ...@@ -78,6 +78,7 @@ describe Gitlab::Geo::LogCursor::Lease, :clean_gitlab_redis_shared_state do
expect(result[:ttl]).to be > 0 expect(result[:ttl]).to be > 0
expect(result[:uuid]).to be false expect(result[:uuid]).to be false
expect(result[:error]).to be true
end end
end end
end end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment