Commit 58371efb authored by James Lopez's avatar James Lopez

Periodically mark projects that are stuck in importing as failed

Adds import jid to projects
Refactor middleware to set custom expiration time via sidekiq options
Add completed_jids option to sidekiq status and a few other changes
parent 7196adaa
...@@ -2,6 +2,8 @@ class RepositoryImportWorker ...@@ -2,6 +2,8 @@ class RepositoryImportWorker
include Sidekiq::Worker include Sidekiq::Worker
include DedicatedSidekiqQueue include DedicatedSidekiqQueue
sidekiq_options status_expiration: StuckImportJobsWorker::IMPORT_EXPIRATION
attr_accessor :project, :current_user attr_accessor :project, :current_user
def perform(project_id) def perform(project_id)
...@@ -12,7 +14,7 @@ class RepositoryImportWorker ...@@ -12,7 +14,7 @@ class RepositoryImportWorker
import_url: @project.import_url, import_url: @project.import_url,
path: @project.path_with_namespace) path: @project.path_with_namespace)
project.update_column(:import_error, nil) project.update_columns(import_jid: self.jid, import_error: nil)
result = Projects::ImportService.new(project, current_user).execute result = Projects::ImportService.new(project, current_user).execute
......
class StuckImportJobsWorker
include Sidekiq::Worker
include CronjobQueue
EXCLUSIVE_LEASE_KEY = 'fail_stuck_imports_worker_lease'.freeze
IMPORT_EXPIRATION = 15.hours.to_i
def perform
return unless try_obtain_lease
stuck_projects.find_in_batches(batch_size: 500) do |group|
jids = group.map(&:import_jid)
# Find the jobs that aren't currently running or that exceeded the threshold.
completed_jids = Gitlab::SidekiqStatus.completed_jids(jids)
if completed_jids.any?
completed_ids = group.select { |project| completed_jids.include?(project.import_jid) }.map(&:id)
fail_batch!(completed_jids, completed_ids)
end
end
remove_lease
end
private
def stuck_projects
Project.select('id, import_jid').with_import_status(:started).where.not(import_jid: nil)
end
def fail_batch!(completed_jids, completed_ids)
Project.where(id: completed_ids).update_all(import_status: 'failed', import_error: error_message)
Rails.logger.info("Marked stuck import jobs as failed. JIDs: #{completed_jids.join(', ')}")
end
def error_message
"Import timed out. Import took longer than #{IMPORT_EXPIRATION} seconds"
end
def try_obtain_lease
@uuid = Gitlab::ExclusiveLease.new(EXCLUSIVE_LEASE_KEY, timeout: 30.minutes).try_obtain
end
def remove_lease
Gitlab::ExclusiveLease.cancel(EXCLUSIVE_LEASE_KEY, @uuid)
end
end
---
title: Periodically mark projects that are stuck in importing as failed
merge_request:
author:
...@@ -349,6 +349,9 @@ Settings.cron_jobs['trending_projects_worker']['job_class'] = 'TrendingProjectsW ...@@ -349,6 +349,9 @@ Settings.cron_jobs['trending_projects_worker']['job_class'] = 'TrendingProjectsW
Settings.cron_jobs['remove_unreferenced_lfs_objects_worker'] ||= Settingslogic.new({}) Settings.cron_jobs['remove_unreferenced_lfs_objects_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['remove_unreferenced_lfs_objects_worker']['cron'] ||= '20 0 * * *' Settings.cron_jobs['remove_unreferenced_lfs_objects_worker']['cron'] ||= '20 0 * * *'
Settings.cron_jobs['remove_unreferenced_lfs_objects_worker']['job_class'] = 'RemoveUnreferencedLfsObjectsWorker' Settings.cron_jobs['remove_unreferenced_lfs_objects_worker']['job_class'] = 'RemoveUnreferencedLfsObjectsWorker'
Settings.cron_jobs['stuck_import_jobs_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['stuck_import_jobs_worker']['cron'] ||= '15 * * * *'
Settings.cron_jobs['stuck_import_jobs_worker']['job_class'] = 'StuckImportJobsWorker'
# #
# GitLab Shell # GitLab Shell
......
class AddImportJidToProjects < ActiveRecord::Migration
include Gitlab::Database::MigrationHelpers
DOWNTIME = false
def change
add_column :projects, :import_jid, :string
end
end
...@@ -920,6 +920,7 @@ ActiveRecord::Schema.define(version: 20170402231018) do ...@@ -920,6 +920,7 @@ ActiveRecord::Schema.define(version: 20170402231018) do
t.text "description_html" t.text "description_html"
t.boolean "only_allow_merge_if_all_discussions_are_resolved" t.boolean "only_allow_merge_if_all_discussions_are_resolved"
t.boolean "printing_merge_request_link_enabled", default: true, null: false t.boolean "printing_merge_request_link_enabled", default: true, null: false
t.string "import_jid"
end end
add_index "projects", ["ci_id"], name: "index_projects_on_ci_id", using: :btree add_index "projects", ["ci_id"], name: "index_projects_on_ci_id", using: :btree
......
...@@ -72,6 +72,8 @@ module Gitlab ...@@ -72,6 +72,8 @@ module Gitlab
# job_ids - The Sidekiq job IDs to check. # job_ids - The Sidekiq job IDs to check.
# #
# Returns an array of true or false indicating job completion. # Returns an array of true or false indicating job completion.
# true = job is still running
# false = job completed
def self.job_status(job_ids) def self.job_status(job_ids)
keys = job_ids.map { |jid| key_for(jid) } keys = job_ids.map { |jid| key_for(jid) }
...@@ -82,6 +84,17 @@ module Gitlab ...@@ -82,6 +84,17 @@ module Gitlab
end end
end end
# Returns the JIDs that are completed
#
# job_ids - The Sidekiq job IDs to check.
#
# Returns an array of completed JIDs
def self.completed_jids(job_ids)
Sidekiq.redis do |redis|
job_ids.reject { |jid| redis.exists(key_for(jid)) }
end
end
def self.key_for(jid) def self.key_for(jid)
STATUS_KEY % jid STATUS_KEY % jid
end end
......
...@@ -2,7 +2,9 @@ module Gitlab ...@@ -2,7 +2,9 @@ module Gitlab
module SidekiqStatus module SidekiqStatus
class ClientMiddleware class ClientMiddleware
def call(_, job, _, _) def call(_, job, _, _)
Gitlab::SidekiqStatus.set(job['jid']) status_expiration = job['status_expiration'] || Gitlab::SidekiqStatus::DEFAULT_EXPIRATION
Gitlab::SidekiqStatus.set(job['jid'], status_expiration)
yield yield
end end
end end
......
...@@ -3,7 +3,7 @@ require 'spec_helper' ...@@ -3,7 +3,7 @@ require 'spec_helper'
describe Gitlab::SidekiqStatus::ClientMiddleware do describe Gitlab::SidekiqStatus::ClientMiddleware do
describe '#call' do describe '#call' do
it 'tracks the job in Redis' do it 'tracks the job in Redis' do
expect(Gitlab::SidekiqStatus).to receive(:set).with('123') expect(Gitlab::SidekiqStatus).to receive(:set).with('123', Gitlab::SidekiqStatus::DEFAULT_EXPIRATION)
described_class.new. described_class.new.
call('Foo', { 'jid' => '123' }, double(:queue), double(:pool)) { nil } call('Foo', { 'jid' => '123' }, double(:queue), double(:pool)) { nil }
......
...@@ -73,4 +73,17 @@ describe Gitlab::SidekiqStatus do ...@@ -73,4 +73,17 @@ describe Gitlab::SidekiqStatus do
expect(key).to include('123') expect(key).to include('123')
end end
end end
describe 'completed', :redis do
it 'returns the completed job' do
expect(described_class.completed_jids(%w(123))).to eq(['123'])
end
it 'returns only the jobs completed' do
described_class.set('123')
described_class.set('456')
expect(described_class.completed_jids(%w(123 456 789))).to eq(['789'])
end
end
end end
...@@ -23,10 +23,12 @@ describe RepositoryImportWorker do ...@@ -23,10 +23,12 @@ describe RepositoryImportWorker do
error = %q{remote: Not Found fatal: repository 'https://user:pass@test.com/root/repoC.git/' not found } error = %q{remote: Not Found fatal: repository 'https://user:pass@test.com/root/repoC.git/' not found }
expect_any_instance_of(Projects::ImportService).to receive(:execute). expect_any_instance_of(Projects::ImportService).to receive(:execute).
and_return({ status: :error, message: error }) and_return({ status: :error, message: error })
allow(subject).to receive(:jid).and_return('123')
subject.perform(project.id) subject.perform(project.id)
expect(project.reload.import_error).to include("https://*****:*****@test.com/root/repoC.git/") expect(project.reload.import_error).to include("https://*****:*****@test.com/root/repoC.git/")
expect(project.reload.import_jid).not_to be_nil
end end
end end
end end
......
require 'spec_helper'
describe StuckImportJobsWorker do
let(:worker) { described_class.new }
let(:exclusive_lease_uuid) { SecureRandom.uuid }
before do
allow_any_instance_of(Gitlab::ExclusiveLease).to receive(:try_obtain).and_return(exclusive_lease_uuid)
end
describe 'long running import' do
let(:project) { create(:empty_project, import_jid: '123', import_status: 'started') }
before do
allow(Gitlab::SidekiqStatus).to receive(:completed_jids).and_return(['123'])
end
it 'marks the project as failed' do
expect { worker.perform }.to change { project.reload.import_status }.to('failed')
end
end
describe 'running import' do
let(:project) { create(:empty_project, import_jid: '123', import_status: 'started') }
before do
allow(Gitlab::SidekiqStatus).to receive(:completed_jids).and_return([])
end
it 'does not mark the project as failed' do
worker.perform
expect(project.reload.import_status).to eq('started')
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment