Commit b569b999 authored by Tiago Botelho's avatar Tiago Botelho

Fork and Import jobs only get marked as failed when the number of Sidekiq retries were exhausted

Conflicts:
	app/workers/repository_import_worker.rb
parent ba8d3532
module ProjectImportOptions
extend ActiveSupport::Concern
included do
IMPORT_RETRY_COUNT = 5
sidekiq_options retry: IMPORT_RETRY_COUNT, status_expiration: StuckImportJobsWorker::IMPORT_JOBS_EXPIRATION
# We only want to mark the project as failed once we exhausted all retries
sidekiq_retries_exhausted do |job|
project = Project.find(job['args'].first)
action = if project.forked?
"fork"
else
"import"
end
project.mark_import_as_failed("Every #{action} attempt has failed: #{job['error_message']}. Please try again.")
Sidekiq.logger.warn "Failed #{job['class']} with #{job['args']}: #{job['error_message']}"
end
end
end
# Used in EE by mirroring
module ProjectStartImport module ProjectStartImport
def start(project) def start(project)
if project.import_started? && project.import_jid == self.jid if project.import_started? && project.import_jid == self.jid
......
class RepositoryForkWorker class RepositoryForkWorker
ForkError = Class.new(StandardError)
include ApplicationWorker include ApplicationWorker
include Gitlab::ShellAdapter include Gitlab::ShellAdapter
include ProjectStartImport include ProjectStartImport
include ProjectImportOptions
sidekiq_options status_expiration: StuckImportJobsWorker::IMPORT_JOBS_EXPIRATION
def perform(project_id, forked_from_repository_storage_path, source_disk_path) def perform(project_id, forked_from_repository_storage_path, source_disk_path)
project = Project.find(project_id) project = Project.find(project_id)
...@@ -18,20 +15,12 @@ class RepositoryForkWorker ...@@ -18,20 +15,12 @@ class RepositoryForkWorker
result = gitlab_shell.fork_repository(forked_from_repository_storage_path, source_disk_path, result = gitlab_shell.fork_repository(forked_from_repository_storage_path, source_disk_path,
project.repository_storage_path, project.disk_path) project.repository_storage_path, project.disk_path)
raise ForkError, "Unable to fork project #{project_id} for repository #{source_disk_path} -> #{project.disk_path}" unless result raise "Unable to fork project #{project_id} for repository #{source_disk_path} -> #{project.disk_path}" unless result
project.repository.after_import project.repository.after_import
raise ForkError, "Project #{project_id} had an invalid repository after fork" unless project.valid_repo? raise "Project #{project_id} had an invalid repository after fork" unless project.valid_repo?
project.import_finish project.import_finish
rescue ForkError => ex
fail_fork(project, ex.message)
raise
rescue => ex
return unless project
fail_fork(project, ex.message)
raise ForkError, "#{ex.class} #{ex.message}"
end end
private private
...@@ -42,9 +31,4 @@ class RepositoryForkWorker ...@@ -42,9 +31,4 @@ class RepositoryForkWorker
Rails.logger.info("Project #{project.full_path} was in inconsistent state (#{project.import_status}) while forking.") Rails.logger.info("Project #{project.full_path} was in inconsistent state (#{project.import_status}) while forking.")
false false
end end
def fail_fork(project, message)
Rails.logger.error(message)
project.mark_import_as_failed(message)
end
end end
class RepositoryImportWorker class RepositoryImportWorker
ImportError = Class.new(StandardError)
include ApplicationWorker include ApplicationWorker
include ExceptionBacktrace include ExceptionBacktrace
include ProjectStartImport include ProjectStartImport
include ProjectImportOptions
sidekiq_options status_expiration: StuckImportJobsWorker::IMPORT_JOBS_EXPIRATION
def perform(project_id) def perform(project_id)
project = Project.find(project_id) project = Project.find(project_id)
...@@ -23,21 +20,13 @@ class RepositoryImportWorker ...@@ -23,21 +20,13 @@ class RepositoryImportWorker
# to those importers to mark the import process as complete. # to those importers to mark the import process as complete.
return if service.async? return if service.async?
raise ImportError, result[:message] if result[:status] == :error raise result[:message] if result[:status] == :error
project.after_import project.after_import
# Explicitly enqueue mirror for update so # Explicitly enqueue mirror for update so
# that upstream remote is created and fetched # that upstream remote is created and fetched
project.force_import_job! if project.mirror? project.force_import_job! if project.mirror?
rescue ImportError => ex
fail_import(project, ex.message)
raise
rescue => ex
return unless project
fail_import(project, ex.message)
raise ImportError, "#{ex.class} #{ex.message}"
end end
private private
...@@ -48,8 +37,4 @@ class RepositoryImportWorker ...@@ -48,8 +37,4 @@ class RepositoryImportWorker
Rails.logger.info("Project #{project.full_path} was in inconsistent state (#{project.import_status}) while importing.") Rails.logger.info("Project #{project.full_path} was in inconsistent state (#{project.import_status}) while importing.")
false false
end end
def fail_import(project, message)
project.mark_import_as_failed(message)
end
end end
---
title: Only mark import and fork jobs as failed once all Sidekiq retries get exhausted
merge_request: 15844
author:
type: changed
require 'spec_helper'
describe ProjectImportOptions do
let(:project) { create(:project, :import_started) }
let(:job) { { 'args' => [project.id, nil, nil], 'jid' => '123' } }
let(:worker_class) do
Class.new do
include Sidekiq::Worker
include ProjectImportOptions
end
end
it 'sets default retry limit' do
expect(worker_class.sidekiq_options['retry']).to eq(ProjectImportOptions::IMPORT_RETRY_COUNT)
end
it 'sets default status expiration' do
expect(worker_class.sidekiq_options['status_expiration']).to eq(StuckImportJobsWorker::IMPORT_JOBS_EXPIRATION)
end
describe '.sidekiq_retries_exhausted' do
it 'marks fork as failed' do
expect { worker_class.sidekiq_retries_exhausted_block.call(job) }.to change { project.reload.import_status }.from("started").to("failed")
end
it 'logs the appropriate error message for forked projects' do
allow_any_instance_of(Project).to receive(:forked?).and_return(true)
worker_class.sidekiq_retries_exhausted_block.call(job)
expect(project.reload.import_error).to include("fork")
end
it 'logs the appropriate error message for forked projects' do
worker_class.sidekiq_retries_exhausted_block.call(job)
expect(project.reload.import_error).to include("import")
end
end
end
require 'spec_helper' require 'spec_helper'
describe RepositoryForkWorker do describe RepositoryForkWorker do
let(:project) { create(:project, :repository) } describe 'modules' do
let(:fork_project) { create(:project, :repository, :import_scheduled, forked_from_project: project) } it 'includes ProjectImportOptions' do
let(:shell) { Gitlab::Shell.new } expect(described_class).to include_module(ProjectImportOptions)
end
subject { described_class.new }
before do
allow(subject).to receive(:gitlab_shell).and_return(shell)
end end
describe "#perform" do describe "#perform" do
let(:project) { create(:project, :repository) }
let(:fork_project) { create(:project, :repository, :import_scheduled, forked_from_project: project) }
let(:shell) { Gitlab::Shell.new }
before do
allow(subject).to receive(:gitlab_shell).and_return(shell)
end
def perform! def perform!
subject.perform(fork_project.id, '/test/path', project.disk_path) subject.perform(fork_project.id, '/test/path', project.disk_path)
end end
...@@ -60,14 +64,7 @@ describe RepositoryForkWorker do ...@@ -60,14 +64,7 @@ describe RepositoryForkWorker do
expect_fork_repository.and_return(false) expect_fork_repository.and_return(false)
expect { perform! }.to raise_error(RepositoryForkWorker::ForkError, error_message) expect { perform! }.to raise_error(StandardError, error_message)
end
it 'handles unexpected error' do
expect_fork_repository.and_raise(RuntimeError)
expect { perform! }.to raise_error(RepositoryForkWorker::ForkError)
expect(fork_project.reload.import_status).to eq('failed')
end end
end end
end end
require 'spec_helper' require 'spec_helper'
describe RepositoryImportWorker do describe RepositoryImportWorker do
let(:project) { create(:project, :import_scheduled) } describe 'modules' do
it 'includes ProjectImportOptions' do
subject { described_class.new } expect(described_class).to include_module(ProjectImportOptions)
end
end
describe '#perform' do describe '#perform' do
let(:project) { create(:project, :import_scheduled) }
context 'when worker was reset without cleanup' do context 'when worker was reset without cleanup' do
let(:jid) { '12345678' } let(:jid) { '12345678' }
let(:started_project) { create(:project, :import_started, import_jid: jid) } let(:started_project) { create(:project, :import_started, import_jid: jid) }
...@@ -57,22 +61,11 @@ describe RepositoryImportWorker do ...@@ -57,22 +61,11 @@ describe RepositoryImportWorker do
expect do expect do
subject.perform(project.id) subject.perform(project.id)
end.to raise_error(RepositoryImportWorker::ImportError, error) end.to raise_error(StandardError, error)
expect(project.reload.import_jid).not_to be_nil expect(project.reload.import_jid).not_to be_nil
end end
end end
context 'with unexpected error' do
it 'marks import as failed' do
allow_any_instance_of(Projects::ImportService).to receive(:execute).and_raise(RuntimeError)
expect do
subject.perform(project.id)
end.to raise_error(RepositoryImportWorker::ImportError)
expect(project.reload.import_status).to eq('failed')
end
end
context 'when using an asynchronous importer' do context 'when using an asynchronous importer' do
it 'does not mark the import process as finished' do it 'does not mark the import process as finished' do
service = double(:service) service = double(:service)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment