Commit 6b43af8d authored by Alexandru Croitor's avatar Alexandru Croitor

Process stuck jira import jobs

Detect stuck jira import jobs every 15 mins and set them to failed
with respective error message.
parent 1e7b5fcb
......@@ -5,6 +5,9 @@ module ImportState
extend ActiveSupport::Concern
included do
scope :with_jid, -> { where.not(jid: nil) }
scope :without_jid, -> { where(jid: nil) }
# Refreshes the expiration time of the associated import job ID.
#
# This method can be used by asynchronous importers to refresh the status,
......
......@@ -7,6 +7,7 @@ class JiraImportState < ApplicationRecord
self.table_name = 'jira_imports'
ERROR_MESSAGE_SIZE = 1000 # 1000 characters limit
STATUSES = { initial: 0, scheduled: 1, started: 2, failed: 3, finished: 4 }.freeze
belongs_to :project
......@@ -14,6 +15,7 @@ class JiraImportState < ApplicationRecord
belongs_to :label
scope :by_jira_project_key, -> (jira_project_key) { where(jira_project_key: jira_project_key) }
scope :with_status, ->(statuses) { where(status: statuses) }
validates :project, presence: true
validates :jira_project_key, presence: true
......@@ -25,6 +27,8 @@ class JiraImportState < ApplicationRecord
message: _('Cannot have multiple Jira imports running at the same time')
}
before_save :ensure_error_message_size
alias_method :scheduled_by, :user
state_machine :status, initial: :initial do
......@@ -65,6 +69,13 @@ class JiraImportState < ApplicationRecord
end
end
after_transition any => :failed do |state, transition|
arguments_hash = transition.args.first
error_message = arguments_hash&.dig(:error_message)
state.update_column(:error_message, error_message) if error_message.present?
end
# Supress warning:
# both JiraImportState and its :status machine have defined a different default for "status".
# although both have same value but represented in 2 ways: integer(0) and symbol(:initial)
......@@ -102,4 +113,18 @@ class JiraImportState < ApplicationRecord
def self.finished_imports_count
finished.sum(:imported_issues_count)
end
def mark_as_failed(error_message)
sanitized_message = Gitlab::UrlSanitizer.sanitize(error_message)
do_fail(error_message: error_message)
rescue ActiveRecord::ActiveRecordError => e
Gitlab::AppLogger.error("Error setting import status to failed: #{e.message}. Original error: #{sanitized_message}")
end
private
def ensure_error_message_size
self.error_message = error_message&.truncate(ERROR_MESSAGE_SIZE)
end
end
......@@ -28,8 +28,8 @@ module JiraImport
rescue => ex
# in case project.save! raises an erorr
Gitlab::ErrorTracking.track_exception(ex, project_id: project.id)
jira_import&.do_fail!(error_message: ex.message)
build_error_response(ex.message)
jira_import.do_fail!
end
def build_jira_import
......
......@@ -155,6 +155,14 @@
:weight: 1
:idempotent:
:tags: []
- :name: cronjob:jira_import_stuck_jira_import_jobs
:feature_category: :importers
:has_external_dependencies:
:urgency: :low
:resource_boundary: :cpu
:weight: 1
:idempotent:
:tags: []
- :name: cronjob:namespaces_prune_aggregation_schedules
:feature_category: :source_code_management
:has_external_dependencies:
......
# frozen_string_literal: true
module Gitlab
module Import
module StuckImportJob
extend ActiveSupport::Concern
IMPORT_JOBS_EXPIRATION = 15.hours.seconds.to_i
included do
include ApplicationWorker
# rubocop:disable Scalability/CronWorkerContext
# This worker updates several import states inline and does not schedule
# other jobs. So no context needed
include CronjobQueue
# rubocop:enable Scalability/CronWorkerContext
feature_category :importers
worker_resource_boundary :cpu
end
def perform
stuck_imports_without_jid_count = mark_imports_without_jid_as_failed!
stuck_imports_with_jid_count = mark_imports_with_jid_as_failed!
track_metrics(stuck_imports_with_jid_count, stuck_imports_without_jid_count)
end
private
def track_metrics(with_jid_count, without_jid_count)
raise NotImplementedError
end
def mark_imports_without_jid_as_failed!
enqueued_import_states_without_jid.each do |import_state|
import_state.mark_as_failed(error_message)
end.size
end
def mark_imports_with_jid_as_failed!
jids_and_ids = enqueued_import_states_with_jid.pluck(:jid, :id).to_h # rubocop: disable CodeReuse/ActiveRecord
# Find the jobs that aren't currently running or that exceeded the threshold.
completed_jids = Gitlab::SidekiqStatus.completed_jids(jids_and_ids.keys)
return 0 unless completed_jids.any?
completed_import_state_ids = jids_and_ids.values_at(*completed_jids)
# We select the import states again, because they may have transitioned from
# scheduled/started to finished/failed while we were looking up their Sidekiq status.
completed_import_states = enqueued_import_states_with_jid.id_in(completed_import_state_ids)
completed_import_state_jids = completed_import_states.map { |import_state| import_state.jid }.join(', ')
Gitlab::Import::Logger.info(
message: 'Marked stuck import jobs as failed',
job_ids: completed_import_state_jids
)
completed_import_states.each do |import_state|
import_state.mark_as_failed(error_message)
end.size
end
def enqueued_import_states
raise NotImplementedError
end
def enqueued_import_states_with_jid
enqueued_import_states.with_jid
end
def enqueued_import_states_without_jid
enqueued_import_states.without_jid
end
def error_message
_("Import timed out. Import took longer than %{import_jobs_expiration} seconds") % { import_jobs_expiration: IMPORT_JOBS_EXPIRATION }
end
end
end
end
# frozen_string_literal: true
module Gitlab
module JiraImport
class StuckJiraImportJobsWorker # rubocop:disable Scalability/IdempotentWorker
include Gitlab::Import::StuckImportJob
private
def track_metrics(with_jid_count, without_jid_count)
Gitlab::Metrics.add_event(:stuck_jira_import_jobs,
jira_imports_without_jid_count: with_jid_count,
jira_imports_with_jid_count: without_jid_count)
end
def enqueued_import_states
JiraImportState.with_status([:scheduled, :started])
end
end
end
end
# frozen_string_literal: true
class StuckImportJobsWorker # rubocop:disable Scalability/IdempotentWorker
include ApplicationWorker
# rubocop:disable Scalability/CronWorkerContext
# This worker updates several import states inline and does not schedule
# other jobs. So no context needed
include CronjobQueue
# rubocop:enable Scalability/CronWorkerContext
include Gitlab::Import::StuckImportJob
feature_category :importers
worker_resource_boundary :cpu
IMPORT_JOBS_EXPIRATION = 15.hours.to_i
def perform
import_state_without_jid_count = mark_import_states_without_jid_as_failed!
import_state_with_jid_count = mark_import_states_with_jid_as_failed!
Gitlab::Metrics.add_event(:stuck_import_jobs,
projects_without_jid_count: import_state_without_jid_count,
projects_with_jid_count: import_state_with_jid_count)
end
IMPORT_JOBS_EXPIRATION = Gitlab::Import::StuckImportJob::IMPORT_JOBS_EXPIRATION
private
def mark_import_states_without_jid_as_failed!
enqueued_import_states_without_jid.each do |import_state|
import_state.mark_as_failed(error_message)
end.count
end
# rubocop: disable CodeReuse/ActiveRecord
def mark_import_states_with_jid_as_failed!
jids_and_ids = enqueued_import_states_with_jid.pluck(:jid, :id).to_h
# Find the jobs that aren't currently running or that exceeded the threshold.
completed_jids = Gitlab::SidekiqStatus.completed_jids(jids_and_ids.keys)
return unless completed_jids.any?
completed_import_state_ids = jids_and_ids.values_at(*completed_jids)
# We select the import states again, because they may have transitioned from
# scheduled/started to finished/failed while we were looking up their Sidekiq status.
completed_import_states = enqueued_import_states_with_jid.where(id: completed_import_state_ids)
completed_import_state_jids = completed_import_states.map { |import_state| import_state.jid }.join(', ')
Gitlab::Import::Logger.info(
message: 'Marked stuck import jobs as failed',
job_ids: completed_import_state_jids
def track_metrics(with_jid_count, without_jid_count)
Gitlab::Metrics.add_event(
:stuck_import_jobs,
projects_without_jid_count: without_jid_count,
projects_with_jid_count: with_jid_count
)
completed_import_states.each do |import_state|
import_state.mark_as_failed(error_message)
end.count
end
# rubocop: enable CodeReuse/ActiveRecord
def enqueued_import_states
ProjectImportState.with_status([:scheduled, :started])
end
# rubocop: disable CodeReuse/ActiveRecord
def enqueued_import_states_with_jid
enqueued_import_states.where.not(jid: nil)
end
# rubocop: enable CodeReuse/ActiveRecord
# rubocop: disable CodeReuse/ActiveRecord
def enqueued_import_states_without_jid
enqueued_import_states.where(jid: nil)
end
# rubocop: enable CodeReuse/ActiveRecord
def error_message
_("Import timed out. Import took longer than %{import_jobs_expiration} seconds") % { import_jobs_expiration: IMPORT_JOBS_EXPIRATION }
end
end
---
title: Process stuck jira import jobs
merge_request: 32643
author:
type: added
......@@ -454,6 +454,9 @@ Settings.cron_jobs['remove_unreferenced_lfs_objects_worker']['job_class'] = 'Rem
Settings.cron_jobs['stuck_import_jobs_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['stuck_import_jobs_worker']['cron'] ||= '15 * * * *'
Settings.cron_jobs['stuck_import_jobs_worker']['job_class'] = 'StuckImportJobsWorker'
Settings.cron_jobs['jira_import_stuck_jira_import_jobs'] ||= Settingslogic.new({})
Settings.cron_jobs['jira_import_stuck_jira_import_jobs']['cron'] ||= '* 0/15 * * *'
Settings.cron_jobs['jira_import_stuck_jira_import_jobs']['job_class'] = 'Gitlab::JiraImport::StuckJiraImportJobsWorker'
Settings.cron_jobs['stuck_export_jobs_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['stuck_export_jobs_worker']['cron'] ||= '30 * * * *'
Settings.cron_jobs['stuck_export_jobs_worker']['job_class'] = 'StuckExportJobsWorker'
......
# frozen_string_literal: true
class AddErrorMessageColumnToJiraImports < ActiveRecord::Migration[6.0]
include Gitlab::Database::MigrationHelpers
DOWNTIME = false
disable_ddl_transaction!
def up
unless column_exists?(:jira_imports, :error_message)
add_column :jira_imports, :error_message, :text
end
add_text_limit :jira_imports, :error_message, 1000
end
def down
return unless column_exists?(:jira_imports, :error_message)
remove_column :jira_imports, :error_message
end
end
......@@ -3516,7 +3516,9 @@ CREATE TABLE public.jira_imports (
jid character varying(255),
jira_project_key character varying(255) NOT NULL,
jira_project_name character varying(255) NOT NULL,
scheduled_at timestamp with time zone
scheduled_at timestamp with time zone,
error_message text,
CONSTRAINT check_9ed451c5b1 CHECK ((char_length(error_message) <= 1000))
);
CREATE SEQUENCE public.jira_imports_id_seq
......@@ -13955,6 +13957,7 @@ COPY "schema_migrations" (version) FROM STDIN;
20200515155620
20200518091745
20200518133123
20200519101002
20200519115908
20200519171058
20200519194042
......
......@@ -54,8 +54,8 @@ class StuckImportJobsWorker
IMPORT_JOBS_EXPIRATION = 15.hours.to_i
def perform
import_state_without_jid_count = mark_import_states_without_jid_as_failed!
import_state_with_jid_count = mark_import_states_with_jid_as_failed!
imports_without_jid_count = mark_imports_without_jid_as_failed!
imports_with_jid_count = mark_imports_with_jid_as_failed!
...
```
......
......@@ -26,6 +26,7 @@ describe Gitlab::SidekiqConfig do
queues = described_class.expand_queues(%w[cronjob])
expect(queues).to include('cronjob:stuck_import_jobs')
expect(queues).to include('cronjob:jira_import_stuck_jira_import_jobs')
expect(queues).to include('cronjob:stuck_merge_jobs')
end
......
......@@ -76,7 +76,12 @@ describe Gitlab::SidekiqConfig::CliMethods do
describe '.expand_queues' do
let(:worker_queues) do
['cronjob:stuck_import_jobs', 'cronjob:stuck_merge_jobs', 'post_receive']
[
'cronjob:stuck_import_jobs',
'cronjob:jira_import_stuck_jira_import_jobs',
'cronjob:stuck_merge_jobs',
'post_receive'
]
end
it 'defaults the value of the second argument to .worker_queues' do
......@@ -88,12 +93,12 @@ describe Gitlab::SidekiqConfig::CliMethods do
allow(described_class).to receive(:worker_queues).and_return(worker_queues)
expect(described_class.expand_queues(['cronjob']))
.to contain_exactly('cronjob', 'cronjob:stuck_import_jobs', 'cronjob:stuck_merge_jobs')
.to contain_exactly('cronjob', 'cronjob:stuck_import_jobs', 'cronjob:jira_import_stuck_jira_import_jobs', 'cronjob:stuck_merge_jobs')
end
it 'expands queue namespaces to concrete queue names' do
expect(described_class.expand_queues(['cronjob'], worker_queues))
.to contain_exactly('cronjob', 'cronjob:stuck_import_jobs', 'cronjob:stuck_merge_jobs')
.to contain_exactly('cronjob', 'cronjob:stuck_import_jobs', 'cronjob:jira_import_stuck_jira_import_jobs', 'cronjob:stuck_merge_jobs')
end
it 'lets concrete queue names pass through' do
......
......@@ -19,6 +19,7 @@ describe Gitlab::SidekiqConfig do
expect(queues).to include('post_receive')
expect(queues).to include('merge')
expect(queues).to include('cronjob:stuck_import_jobs')
expect(queues).to include('cronjob:jira_import_stuck_jira_import_jobs')
expect(queues).to include('mailers')
expect(queues).to include('default')
end
......
......@@ -17,6 +17,7 @@ describe Gitlab::SidekiqVersioning::Manager do
expect(queues).to include('repository_fork')
expect(queues).to include('cronjob')
expect(queues).to include('cronjob:stuck_import_jobs')
expect(queues).to include('cronjob:jira_import_stuck_jira_import_jobs')
expect(queues).to include('cronjob:stuck_merge_jobs')
expect(queues).to include('unknown')
end
......
......@@ -163,4 +163,39 @@ describe JiraImportState do
end
end
end
context 'ensure error_message size on save' do
let_it_be(:project) { create(:project) }
before do
stub_const('JiraImportState::ERROR_MESSAGE_SIZE', 10)
end
context 'when jira import has no error_message' do
let(:jira_import) { build(:jira_import_state, project: project)}
it 'does not run the callback', :aggregate_failures do
expect { jira_import.save }.to change { JiraImportState.count }.by(1)
expect(jira_import.reload.error_message).to be_nil
end
end
context 'when jira import error_message does not exceed the limit' do
let(:jira_import) { build(:jira_import_state, project: project, error_message: 'error')}
it 'does not run the callback', :aggregate_failures do
expect { jira_import.save }.to change { JiraImportState.count }.by(1)
expect(jira_import.reload.error_message).to eq('error')
end
end
context 'when error_message exceeds limit' do
let(:jira_import) { build(:jira_import_state, project: project, error_message: 'error message longer than the limit')}
it 'truncates error_message to the limit', :aggregate_failures do
expect { jira_import.save! }.to change { JiraImportState.count }.by(1)
expect(jira_import.reload.error_message.size).to eq 10
end
end
end
end
......@@ -45,6 +45,22 @@ describe JiraImport::StartImportService do
it_behaves_like 'responds with error', 'Jira import is already running.'
end
context 'when an error is raised while scheduling import' do
before do
expect_next_instance_of(JiraImportState) do |jira_impport|
expect(jira_impport).to receive(:schedule!).and_raise(Projects::ImportService::Error, 'Unexpected failure.')
end
end
it_behaves_like 'responds with error', 'Unexpected failure.'
it 'saves the error message' do
subject
expect(JiraImportState.last.error_message).to eq('Unexpected failure.')
end
end
context 'when everything is ok' do
it 'returns success response' do
expect(subject).to be_a(ServiceResponse)
......@@ -57,7 +73,7 @@ describe JiraImport::StartImportService do
expect(project.latest_jira_import).to be_scheduled
end
it 'creates Jira import data' do
it 'creates Jira import data', :aggregate_failures do
jira_import = subject.payload[:import_data]
expect(jira_import.jira_project_xid).to eq(0)
......@@ -72,8 +88,8 @@ describe JiraImport::StartImportService do
it 'creates Jira label title with correct number' do
jira_import = subject.payload[:import_data]
label_title = "jira-import::#{jira_import.jira_project_key}-1"
expect(jira_import.label.title).to eq(label_title)
end
end
......@@ -83,8 +99,8 @@ describe JiraImport::StartImportService do
it 'creates Jira label title with correct number' do
jira_import = subject.payload[:import_data]
label_title = "jira-import::#{jira_import.jira_project_key}-4"
expect(jira_import.label.title).to eq(label_title)
end
end
......
# frozen_string_literal: true
shared_examples 'stuck import job detection' do
context 'when the job has completed' do
context 'when the import status was already updated' do
before do
allow(Gitlab::SidekiqStatus).to receive(:completed_jids) do
import_state.start
import_state.finish
[import_state.jid]
end
end
it 'does not mark the import as failed' do
worker.perform
expect(import_state.reload.status).to eq('finished')
end
end
context 'when the import status was not updated' do
before do
allow(Gitlab::SidekiqStatus).to receive(:completed_jids).and_return([import_state.jid])
end
it 'marks the import as failed' do
worker.perform
expect(import_state.reload.status).to eq('failed')
end
end
end
context 'when the job is still in Sidekiq' do
before do
allow(Gitlab::SidekiqStatus).to receive(:completed_jids).and_return([])
end
it 'does not mark the import as failed' do
expect { worker.perform }.not_to change { import_state.reload.status }
end
end
end
# frozen_string_literal: true
require 'spec_helper'
describe ::Gitlab::JiraImport::StuckJiraImportJobsWorker do
let_it_be(:current_user) { create(:user) }
let_it_be(:project) { create(:project) }
let(:worker) { described_class.new }
describe 'with scheduled Jira import' do
it_behaves_like 'stuck import job detection' do
let(:import_state) { create(:jira_import_state, :scheduled, project: project) }
before do
import_state.update(jid: '123')
end
end
end
describe 'with started jira import' do
it_behaves_like 'stuck import job detection' do
let(:import_state) { create(:jira_import_state, :started, project: project) }
before do
import_state.update(jid: '123')
end
end
end
describe 'with failed jira import' do
let(:import_state) { create(:jira_import_state, :failed, project: project) }
it 'detects no stuck jobs' do
expect(worker).to receive(:track_metrics).with(0, 0)
worker.perform
end
end
end
......@@ -5,51 +5,8 @@ require 'spec_helper'
describe StuckImportJobsWorker do
let(:worker) { described_class.new }
shared_examples 'project import job detection' do
context 'when the job has completed' do
context 'when the import status was already updated' do
before do
allow(Gitlab::SidekiqStatus).to receive(:completed_jids) do
import_state.start
import_state.finish
[import_state.jid]
end
end
it 'does not mark the project as failed' do
worker.perform
expect(import_state.reload.status).to eq('finished')
end
end
context 'when the import status was not updated' do
before do
allow(Gitlab::SidekiqStatus).to receive(:completed_jids).and_return([import_state.jid])
end
it 'marks the project as failed' do
worker.perform
expect(import_state.reload.status).to eq('failed')
end
end
end
context 'when the job is still in Sidekiq' do
before do
allow(Gitlab::SidekiqStatus).to receive(:completed_jids).and_return([])
end
it 'does not mark the project as failed' do
expect { worker.perform }.not_to change { import_state.reload.status }
end
end
end
describe 'with scheduled import_status' do
it_behaves_like 'project import job detection' do
it_behaves_like 'stuck import job detection' do
let(:import_state) { create(:project, :import_scheduled).import_state }
before do
......@@ -59,7 +16,7 @@ describe StuckImportJobsWorker do
end
describe 'with started import_status' do
it_behaves_like 'project import job detection' do
it_behaves_like 'stuck import job detection' do
let(:import_state) { create(:project, :import_started).import_state }
before do
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment