Commit 99cc256c authored by Douwe Maan's avatar Douwe Maan

Merge branch 'bvl-remote-mirror-exception-handling-ee' into 'master'

[EE] Rework push mirror retries

See merge request gitlab-org/gitlab-ee!14974
parents 2a66b81f 23e7a876
# frozen_string_literal: true
class RemoteMirrorFinder
attr_accessor :params
def initialize(params)
@params = params
end
# rubocop: disable CodeReuse/ActiveRecord
def execute
RemoteMirror.find_by(id: params[:id])
end
# rubocop: enable CodeReuse/ActiveRecord
end
......@@ -3,7 +3,7 @@
module Emails
module RemoteMirrors
def remote_mirror_update_failed_email(remote_mirror_id, recipient_id)
@remote_mirror = RemoteMirrorFinder.new(id: remote_mirror_id).execute
@remote_mirror = RemoteMirror.find_by_id(remote_mirror_id)
@project = @remote_mirror.project
mail(to: recipient(recipient_id, @project.group), subject: subject('Remote mirror update failed'))
......
......@@ -4,6 +4,8 @@ class RemoteMirror < ApplicationRecord
include AfterCommitQueue
include MirrorAuthentication
MAX_FIRST_RUNTIME = 3.hours
MAX_INCREMENTAL_RUNTIME = 1.hour
PROTECTED_BACKOFF_DELAY = 1.minute
UNPROTECTED_BACKOFF_DELAY = 5.minutes
......@@ -31,11 +33,18 @@ class RemoteMirror < ApplicationRecord
scope :enabled, -> { where(enabled: true) }
scope :started, -> { with_update_status(:started) }
scope :stuck, -> { started.where('last_update_at < ? OR (last_update_at IS NULL AND updated_at < ?)', 1.hour.ago, 3.hours.ago) }
scope :stuck, -> do
started
.where('(last_update_started_at < ? AND last_update_at IS NOT NULL)',
MAX_INCREMENTAL_RUNTIME.ago)
.or(where('(last_update_started_at < ? AND last_update_at IS NULL)',
MAX_FIRST_RUNTIME.ago))
end
state_machine :update_status, initial: :none do
event :update_start do
transition [:none, :finished, :failed] => :started
transition any => :started
end
event :update_finish do
......@@ -46,9 +55,14 @@ class RemoteMirror < ApplicationRecord
transition started: :failed
end
event :update_retry do
transition started: :to_retry
end
state :started
state :finished
state :failed
state :to_retry
after_transition any => :started do |remote_mirror, _|
Gitlab::Metrics.add_event(:remote_mirrors_running)
......@@ -138,16 +152,27 @@ class RemoteMirror < ApplicationRecord
end
def updated_since?(timestamp)
last_update_started_at && last_update_started_at > timestamp && !update_failed?
return false if failed?
last_update_started_at && last_update_started_at > timestamp
end
def mark_for_delete_if_blank_url
mark_for_destruction if url.blank?
end
def mark_as_failed(error_message)
update_column(:last_error, Gitlab::UrlSanitizer.sanitize(error_message))
update_fail
def update_error_message(error_message)
self.last_error = Gitlab::UrlSanitizer.sanitize(error_message)
end
def mark_for_retry!(error_message)
update_error_message(error_message)
update_retry!
end
def mark_as_failed!(error_message)
update_error_message(error_message)
update_fail!
end
def url=(value)
......@@ -190,6 +215,18 @@ class RemoteMirror < ApplicationRecord
update_column(:error_notification_sent, true)
end
def backoff_delay
if self.only_protected_branches
PROTECTED_BACKOFF_DELAY
else
UNPROTECTED_BACKOFF_DELAY
end
end
def max_runtime
last_update_at.present? ? MAX_INCREMENTAL_RUNTIME : MAX_FIRST_RUNTIME
end
private
def store_credentials
......@@ -219,14 +256,6 @@ class RemoteMirror < ApplicationRecord
self.last_update_started_at >= Time.now - backoff_delay
end
def backoff_delay
if self.only_protected_branches
PROTECTED_BACKOFF_DELAY
else
UNPROTECTED_BACKOFF_DELAY
end
end
def reset_fields
update_columns(
last_error: nil,
......
......@@ -2,14 +2,30 @@
module Projects
class UpdateRemoteMirrorService < BaseService
attr_reader :errors
MAX_TRIES = 3
def execute(remote_mirror)
def execute(remote_mirror, tries)
return success unless remote_mirror.enabled?
errors = []
update_mirror(remote_mirror)
success
rescue Gitlab::Git::CommandError => e
# This happens if one of the gitaly calls above fail, for example when
# branches have diverged, or the pre-receive hook fails.
retry_or_fail(remote_mirror, e.message, tries)
error(e.message)
rescue => e
remote_mirror.mark_as_failed!(e.message)
raise e
end
private
def update_mirror(remote_mirror)
remote_mirror.update_start!
begin
remote_mirror.ensure_remote!
repository.fetch_remote(remote_mirror.remote_name, ssh_auth: remote_mirror, no_tags: true)
......@@ -19,14 +35,19 @@ module Projects
end
remote_mirror.update_repository(opts)
rescue => e
errors << e.message.strip
remote_mirror.update_finish!
end
if errors.present?
error(errors.join("\n\n"))
def retry_or_fail(mirror, message, tries)
if tries < MAX_TRIES
mirror.mark_for_retry!(message)
else
success
# It's not likely we'll be able to recover from this ourselves, so we'll
# notify the users of the problem, and don't trigger any sidekiq retries
# Instead, we'll wait for the next change to try the push again, or until
# a user manually retries.
mirror.mark_as_failed!(message)
end
end
end
......
......@@ -43,7 +43,8 @@
= _('Mirrored repositories')
= render_if_exists 'projects/mirrors/mirrored_repositories_count'
%th= _('Direction')
%th= _('Last update')
%th= _('Last update attempt')
%th= _('Last successful update')
%th
%th
%tbody.js-mirrors-table-body
......@@ -53,6 +54,8 @@
%tr.qa-mirrored-repository-row.rspec-mirrored-repository-row{ class: ('bg-secondary' if mirror.disabled?) }
%td.qa-mirror-repository-url= mirror.safe_url
%td= _('Push')
%td
= mirror.last_update_started_at.present? ? time_ago_with_tooltip(mirror.last_update_started_at) : _('Never')
%td.qa-mirror-last-update-at= mirror.last_update_at.present? ? time_ago_with_tooltip(mirror.last_update_at) : _('Never')
%td
- if mirror.disabled?
......
......@@ -4,7 +4,7 @@ class RemoteMirrorNotificationWorker
include ApplicationWorker
def perform(remote_mirror_id)
remote_mirror = RemoteMirrorFinder.new(id: remote_mirror_id).execute
remote_mirror = RemoteMirror.find_by_id(remote_mirror_id)
# We check again if there's an error because a newer run since this job was
# fired could've completed successfully.
......
# frozen_string_literal: true
class RepositoryUpdateRemoteMirrorWorker
UpdateAlreadyInProgressError = Class.new(StandardError)
UpdateError = Class.new(StandardError)
include ApplicationWorker
include Gitlab::ExclusiveLeaseHelpers
sidekiq_options retry: 3, dead: false
sidekiq_retry_in { |count| 30 * count }
LOCK_WAIT_TIME = 30.seconds
MAX_TRIES = 3
sidekiq_retries_exhausted do |msg, _|
Sidekiq.logger.warn "Failed #{msg['class']} with #{msg['args']}: #{msg['error_message']}"
end
def perform(remote_mirror_id, scheduled_time)
remote_mirror = RemoteMirrorFinder.new(id: remote_mirror_id).execute
def perform(remote_mirror_id, scheduled_time, tries = 0)
remote_mirror = RemoteMirror.find_by_id(remote_mirror_id)
return unless remote_mirror
return if remote_mirror.updated_since?(scheduled_time)
raise UpdateAlreadyInProgressError if remote_mirror.update_in_progress?
# If the update is already running, wait for it to finish before running again
# This will wait for a total of 90 seconds in 3 steps
in_lock(remote_mirror_update_lock(remote_mirror.id),
retries: 3,
ttl: remote_mirror.max_runtime,
sleep_sec: LOCK_WAIT_TIME) do
update_mirror(remote_mirror, scheduled_time, tries)
end
rescue Gitlab::ExclusiveLeaseHelpers::FailedToObtainLockError
# If an update runs longer than 1.5 minutes, we'll reschedule it
# with a backoff. The next run will check if the previous update would
# include the changes that triggered this update and become a no-op.
self.class.perform_in(remote_mirror.backoff_delay, remote_mirror.id, scheduled_time, tries)
end
remote_mirror.update_start
private
project = remote_mirror.project
def update_mirror(mirror, scheduled_time, tries)
project = mirror.project
current_user = project.creator
result = Projects::UpdateRemoteMirrorService.new(project, current_user).execute(remote_mirror)
raise UpdateError, result[:message] if result[:status] == :error
remote_mirror.update_finish
rescue UpdateAlreadyInProgressError
raise
rescue UpdateError => ex
fail_remote_mirror(remote_mirror, ex.message)
raise
rescue => ex
return unless remote_mirror
result = Projects::UpdateRemoteMirrorService.new(project, current_user).execute(mirror, tries)
fail_remote_mirror(remote_mirror, ex.message)
raise UpdateError, "#{ex.class}: #{ex.message}"
if result[:status] == :error && mirror.to_retry?
schedule_retry(mirror, scheduled_time, tries)
end
end
private
def fail_remote_mirror(remote_mirror, message)
remote_mirror.mark_as_failed(message)
def remote_mirror_update_lock(mirror_id)
[self.class.name, mirror_id].join(':')
end
Rails.logger.error(message) # rubocop:disable Gitlab/RailsLogger
def schedule_retry(mirror, scheduled_time, tries)
self.class.perform_in(mirror.backoff_delay, mirror.id, scheduled_time, tries + 1)
end
end
---
title: Retry push mirrors faster when running concurrently, improve error handling
when push mirrors fail
merge_request: 31247
author:
type: changed
......@@ -9,6 +9,8 @@
%tr
%td.mirror-url= @project.safe_import_url
%td= _('Pull')
%td
= import_state.last_update_started_at.present? ? time_ago_with_tooltip(import_state.last_update_started_at) : _('Never')
%td= import_state.last_update_at.present? ? time_ago_with_tooltip(import_state.last_update_at) : _('Never')
%td
- if import_state&.last_error.present?
......
......@@ -89,21 +89,6 @@ describe Project do
end
end
context '#mark_stuck_remote_mirrors_as_failed!' do
it 'fails stuck remote mirrors' do
project = create(:project, :repository, :remote_mirror)
project.remote_mirrors.first.update(
update_status: :started,
last_update_at: 2.days.ago
)
expect do
project.mark_stuck_remote_mirrors_as_failed!
end.to change { project.remote_mirrors.stuck.count }.from(1).to(0)
end
end
context 'mirror' do
subject { build(:project, mirror: true) }
......
......@@ -8554,9 +8554,15 @@ msgstr ""
msgid "Last seen"
msgstr ""
msgid "Last successful update"
msgstr ""
msgid "Last update"
msgstr ""
msgid "Last update attempt"
msgstr ""
msgid "Last updated"
msgstr ""
......
......@@ -2252,6 +2252,21 @@ describe Project do
end
end
describe '#mark_stuck_remote_mirrors_as_failed!' do
it 'fails stuck remote mirrors' do
project = create(:project, :repository, :remote_mirror)
project.remote_mirrors.first.update(
update_status: :started,
last_update_started_at: 2.days.ago
)
expect do
project.mark_stuck_remote_mirrors_as_failed!
end.to change { project.remote_mirrors.stuck.count }.from(1).to(0)
end
end
describe '#ancestors_upto' do
let(:parent) { create(:group) }
let(:child) { create(:group, parent: parent) }
......
......@@ -153,14 +153,14 @@ describe RemoteMirror, :mailer do
end
end
describe '#mark_as_failed' do
describe '#mark_as_failed!' do
let(:remote_mirror) { create(:remote_mirror) }
let(:error_message) { 'http://user:pass@test.com/root/repoC.git/' }
let(:sanitized_error_message) { 'http://*****:*****@test.com/root/repoC.git/' }
subject do
remote_mirror.update_start
remote_mirror.mark_as_failed(error_message)
remote_mirror.mark_as_failed!(error_message)
end
it 'sets the update_status to failed' do
......@@ -204,8 +204,8 @@ describe RemoteMirror, :mailer do
it 'includes mirrors that were started over an hour ago' do
mirror = create_mirror(url: 'http://cantbeblank',
update_status: 'started',
last_update_at: 3.hours.ago,
updated_at: 2.hours.ago)
last_update_started_at: 3.hours.ago,
last_update_at: 2.hours.ago)
expect(described_class.stuck.last).to eq(mirror)
end
......@@ -214,7 +214,7 @@ describe RemoteMirror, :mailer do
mirror = create_mirror(url: 'http://cantbeblank',
update_status: 'started',
last_update_at: nil,
updated_at: 4.hours.ago)
last_update_started_at: 4.hours.ago)
expect(described_class.stuck.last).to eq(mirror)
end
......
......@@ -10,49 +10,91 @@ describe Projects::UpdateRemoteMirrorService do
subject(:service) { described_class.new(project, project.creator) }
describe "#execute" do
describe '#execute' do
subject(:execute!) { service.execute(remote_mirror, 0) }
before do
project.repository.add_branch(project.owner, 'existing-branch', 'master')
allow(remote_mirror).to receive(:update_repository).and_return(true)
end
it "ensures the remote exists" do
it 'ensures the remote exists' do
stub_fetch_remote(project, remote_name: remote_name, ssh_auth: remote_mirror)
expect(remote_mirror).to receive(:ensure_remote!)
service.execute(remote_mirror)
execute!
end
it "fetches the remote repository" do
it 'fetches the remote repository' do
expect(project.repository)
.to receive(:fetch_remote)
.with(remote_mirror.remote_name, no_tags: true, ssh_auth: remote_mirror)
service.execute(remote_mirror)
execute!
end
it "returns success when updated succeeds" do
it 'marks the mirror as started when beginning' do
expect(remote_mirror).to receive(:update_start!).and_call_original
execute!
end
it 'marks the mirror as successfully finished' do
stub_fetch_remote(project, remote_name: remote_name, ssh_auth: remote_mirror)
result = service.execute(remote_mirror)
result = execute!
expect(result[:status]).to eq(:success)
expect(remote_mirror).to be_finished
end
it 'marks the mirror as failed and raises the error when an unexpected error occurs' do
allow(project.repository).to receive(:fetch_remote).and_raise('Badly broken')
expect { execute! }.to raise_error /Badly broken/
expect(remote_mirror).to be_failed
expect(remote_mirror.last_error).to include('Badly broken')
end
context 'when the update fails because of a `Gitlab::Git::CommandError`' do
before do
allow(project.repository).to receive(:fetch_remote).and_raise(Gitlab::Git::CommandError.new('fetch failed'))
end
it 'wraps `Gitlab::Git::CommandError`s in a service error' do
expect(execute!).to eq(status: :error, message: 'fetch failed')
end
it 'marks the mirror as to be retried' do
execute!
expect(remote_mirror).to be_to_retry
expect(remote_mirror.last_error).to include('fetch failed')
end
it "marks the mirror as failed after #{described_class::MAX_TRIES} tries" do
service.execute(remote_mirror, described_class::MAX_TRIES)
expect(remote_mirror).to be_failed
expect(remote_mirror.last_error).to include('fetch failed')
end
end
context 'when syncing all branches' do
it "push all the branches the first time" do
it 'push all the branches the first time' do
stub_fetch_remote(project, remote_name: remote_name, ssh_auth: remote_mirror)
expect(remote_mirror).to receive(:update_repository).with({})
service.execute(remote_mirror)
execute!
end
end
context 'when only syncing protected branches' do
it "sync updated protected branches" do
it 'sync updated protected branches' do
stub_fetch_remote(project, remote_name: remote_name, ssh_auth: remote_mirror)
protected_branch = create_protected_branch(project)
remote_mirror.only_protected_branches = true
......@@ -61,7 +103,7 @@ describe Projects::UpdateRemoteMirrorService do
.to receive(:update_repository)
.with(only_branches_matching: [protected_branch.name])
service.execute(remote_mirror)
execute!
end
def create_protected_branch(project)
......
......@@ -2,99 +2,70 @@
require 'rails_helper'
describe RepositoryUpdateRemoteMirrorWorker do
describe RepositoryUpdateRemoteMirrorWorker, :clean_gitlab_redis_shared_state do
subject { described_class.new }
let(:remote_mirror) { create(:project, :repository, :remote_mirror).remote_mirrors.first }
let(:remote_mirror) { create(:remote_mirror) }
let(:scheduled_time) { Time.now - 5.minutes }
around do |example|
Timecop.freeze(Time.now) { example.run }
end
describe '#perform' do
context 'with status none' do
before do
remote_mirror.update(update_status: 'none')
def expect_mirror_service_to_return(mirror, result, tries = 0)
expect_next_instance_of(Projects::UpdateRemoteMirrorService) do |service|
expect(service).to receive(:execute).with(mirror, tries).and_return(result)
end
it 'sets status as finished when update remote mirror service executes successfully' do
expect_any_instance_of(Projects::UpdateRemoteMirrorService).to receive(:execute).with(remote_mirror).and_return(status: :success)
expect { subject.perform(remote_mirror.id, Time.now) }.to change { remote_mirror.reload.update_status }.to('finished')
end
it 'resets the notification flag upon success' do
expect_any_instance_of(Projects::UpdateRemoteMirrorService).to receive(:execute).with(remote_mirror).and_return(status: :success)
remote_mirror.update_column(:error_notification_sent, true)
describe '#perform' do
it 'calls out to the service to perform the update' do
expect_mirror_service_to_return(remote_mirror, status: :success)
expect { subject.perform(remote_mirror.id, Time.now) }.to change { remote_mirror.reload.error_notification_sent }.to(false)
subject.perform(remote_mirror.id, scheduled_time)
end
it 'sets status as failed when update remote mirror service executes with errors' do
error_message = 'fail!'
it 'does not do anything if the mirror was already updated' do
remote_mirror.update(last_update_started_at: Time.now, update_status: :finished)
expect_next_instance_of(Projects::UpdateRemoteMirrorService) do |service|
expect(service).to receive(:execute).with(remote_mirror).and_return(status: :error, message: error_message)
end
expect(Projects::UpdateRemoteMirrorService).not_to receive(:new)
# Mock the finder so that it returns an object we can set expectations on
expect_next_instance_of(RemoteMirrorFinder) do |finder|
expect(finder).to receive(:execute).and_return(remote_mirror)
subject.perform(remote_mirror.id, scheduled_time)
end
expect(remote_mirror).to receive(:mark_as_failed).with(error_message)
expect do
subject.perform(remote_mirror.id, Time.now)
end.to raise_error(RepositoryUpdateRemoteMirrorWorker::UpdateError, error_message)
end
it 'schedules a retry when the mirror is marked for retrying' do
remote_mirror = create(:remote_mirror, update_status: :to_retry)
expect_mirror_service_to_return(remote_mirror, status: :error, message: 'Retry!')
it 'does nothing if last_update_started_at is higher than the time the job was scheduled in' do
remote_mirror.update(last_update_started_at: Time.now)
expect(described_class)
.to receive(:perform_in)
.with(remote_mirror.backoff_delay, remote_mirror.id, scheduled_time, 1)
expect_any_instance_of(RemoteMirror).to receive(:updated_since?).with(scheduled_time).and_return(true)
expect_any_instance_of(Projects::UpdateRemoteMirrorService).not_to receive(:execute).with(remote_mirror)
expect(subject.perform(remote_mirror.id, scheduled_time)).to be_nil
end
subject.perform(remote_mirror.id, scheduled_time)
end
context 'with unexpected error' do
it 'marks mirror as failed' do
allow_any_instance_of(Projects::UpdateRemoteMirrorService).to receive(:execute).with(remote_mirror).and_raise(RuntimeError)
expect do
subject.perform(remote_mirror.id, Time.now)
end.to raise_error(RepositoryUpdateRemoteMirrorWorker::UpdateError)
expect(remote_mirror.reload.update_status).to eq('failed')
end
it 'clears the lease if there was an unexpected exception' do
expect_next_instance_of(Projects::UpdateRemoteMirrorService) do |service|
expect(service).to receive(:execute).with(remote_mirror, 1).and_raise('Unexpected!')
end
expect { subject.perform(remote_mirror.id, Time.now, 1) }.to raise_error('Unexpected!')
context 'with another worker already running' do
before do
remote_mirror.update(update_status: 'started')
end
lease = Gitlab::ExclusiveLease.new("#{described_class.name}:#{remote_mirror.id}", timeout: 1.second)
it 'raises RemoteMirrorUpdateAlreadyInProgressError' do
expect do
subject.perform(remote_mirror.id, Time.now)
end.to raise_error(RepositoryUpdateRemoteMirrorWorker::UpdateAlreadyInProgressError)
end
expect(lease.try_obtain).not_to be_nil
end
context 'with status failed' do
before do
remote_mirror.update(update_status: 'failed')
end
it 'sets status as finished if last_update_started_at is higher than the time the job was scheduled in' do
remote_mirror.update(last_update_started_at: Time.now)
it 'retries 3 times for the worker to finish before rescheduling' do
expect(subject).to receive(:in_lock)
.with("#{described_class.name}:#{remote_mirror.id}",
retries: 3,
ttl: remote_mirror.max_runtime,
sleep_sec: described_class::LOCK_WAIT_TIME)
.and_raise(Gitlab::ExclusiveLeaseHelpers::FailedToObtainLockError)
expect(described_class).to receive(:perform_in)
.with(remote_mirror.backoff_delay, remote_mirror.id, scheduled_time, 0)
expect_any_instance_of(RemoteMirror).to receive(:updated_since?).with(scheduled_time).and_return(false)
expect_any_instance_of(Projects::UpdateRemoteMirrorService).to receive(:execute).with(remote_mirror).and_return(status: :success)
expect { subject.perform(remote_mirror.id, scheduled_time) }.to change { remote_mirror.reload.update_status }.to('finished')
end
subject.perform(remote_mirror.id, scheduled_time)
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment