Commit ab23fc1e authored by Furkan Ayhan's avatar Furkan Ayhan Committed by Marius Bobin

Fix DAG order of subsequent jobs after requeue

When we requeue a job, we need to process subsequent skipped jobs,
moreover, we need to do this in a specific order.
Previously, we had an order by stage_idx but after introducing
same-stage jobs, this disappeared. We recently fixed this problem
for stage-based approach. However, the problem still exists for
the same-stage pipelines.

In this commit, we are ordering jobs by stage first,
then their DAG relationships.

These changes are behind a FF ci_fix_order_of_subsequent_jobs
parent 257d5598
......@@ -16,7 +16,7 @@ module Ci
scope :with_needs, -> (names = nil) do
needs = Ci::BuildNeed.scoped_build.select(1)
needs = needs.where(name: names) if names
where('EXISTS (?)', needs).preload(:needs)
where('EXISTS (?)', needs)
end
scope :without_needs, -> (names = nil) do
......
......@@ -22,9 +22,15 @@ module Ci
end
def dependent_jobs
stage_dependent_jobs
.or(needs_dependent_jobs.except(:preload))
dependent_jobs = stage_dependent_jobs
.or(needs_dependent_jobs)
.ordered_by_stage
if ::Feature.enabled?(:ci_fix_order_of_subsequent_jobs, @processable.pipeline.project, default_enabled: :yaml)
dependent_jobs = ordered_by_dag(dependent_jobs)
end
dependent_jobs
end
def process(job)
......@@ -44,5 +50,23 @@ module Ci
def skipped_jobs
@skipped_jobs ||= @processable.pipeline.processables.skipped
end
# rubocop: disable CodeReuse/ActiveRecord
def ordered_by_dag(jobs)
sorted_job_names = sort_jobs(jobs).each_with_index.to_h
jobs.preload(:needs).group_by(&:stage_idx).flat_map do |_, stage_jobs|
stage_jobs.sort_by { |job| sorted_job_names.fetch(job.name) }
end
end
def sort_jobs(jobs)
Gitlab::Ci::YamlProcessor::Dag.order(
jobs.to_h do |job|
[job.name, job.needs.map(&:name)]
end
)
end
# rubocop: enable CodeReuse/ActiveRecord
end
end
---
name: ci_fix_order_of_subsequent_jobs
introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/74394
rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/345587
milestone: '14.9'
type: development
group: group::pipeline authoring
default_enabled: false
......@@ -45,7 +45,7 @@ module Gitlab
validate_job!(name, job)
end
YamlProcessor::Dag.check_circular_dependencies!(@jobs)
check_circular_dependencies
end
def validate_job!(name, job)
......@@ -146,6 +146,17 @@ module Gitlab
end
end
def check_circular_dependencies
jobs = @jobs.values.to_h do |job|
name = job[:name].to_s
needs = job.dig(:needs, :job).to_a
[name, needs.map { |need| need[:name].to_s }]
end
Dag.check_circular_dependencies!(jobs)
end
def error!(message)
raise ValidationError, message
end
......
......@@ -7,28 +7,22 @@ module Gitlab
class Dag
include TSort
MissingNodeError = Class.new(StandardError)
def initialize(nodes)
@nodes = nodes
end
def self.check_circular_dependencies!(jobs)
nodes = jobs.values.to_h do |job|
name = job[:name].to_s
needs = job.dig(:needs, :job).to_a
[name, needs.map { |need| need[:name].to_s }]
def self.order(jobs)
new(jobs).tsort
end
new(nodes).tsort
def self.check_circular_dependencies!(jobs)
new(jobs).tsort
rescue TSort::Cyclic
raise ValidationError, 'The pipeline has circular dependencies'
rescue MissingNodeError
end
def tsort_each_child(node, &block)
raise MissingNodeError, "node #{node} is missing" unless @nodes[node]
return unless @nodes[node]
@nodes[node].each(&block)
end
......
......@@ -27,15 +27,13 @@ RSpec.describe Gitlab::Ci::YamlProcessor::Dag do
end
end
context 'when there is a missing job' do
context 'when there are some missing jobs' do
let(:nodes) do
{ 'job_a' => %w(job_d), 'job_b' => %w(job_a) }
{ 'job_a' => %w(job_d job_f), 'job_b' => %w(job_a job_c job_e) }
end
it 'raises MissingNodeError' do
expect { result }.to raise_error(
Gitlab::Ci::YamlProcessor::Dag::MissingNodeError, 'node job_d is missing'
)
it 'ignores the missing ones and returns in a valid order' do
expect(result).to eq(%w(job_d job_f job_a job_c job_e job_b))
end
end
end
......@@ -2,69 +2,236 @@
require 'spec_helper'
RSpec.describe Ci::AfterRequeueJobService do
let_it_be(:project) { create(:project) }
RSpec.describe Ci::AfterRequeueJobService, :sidekiq_inline do
let_it_be(:project) { create(:project, :empty_repo) }
let_it_be(:user) { project.first_owner }
let(:pipeline) { create(:ci_pipeline, project: project) }
before_all do
project.repository.create_file(user, 'init', 'init', message: 'init', branch_name: 'master')
end
let!(:build1) { create(:ci_build, name: 'build1', pipeline: pipeline, stage_idx: 0) }
let!(:test1) { create(:ci_build, :success, name: 'test1', pipeline: pipeline, stage_idx: 1) }
let!(:test2) { create(:ci_build, :skipped, name: 'test2', pipeline: pipeline, stage_idx: 1) }
let!(:test3) { create(:ci_build, :skipped, :dependent, name: 'test3', pipeline: pipeline, stage_idx: 1, needed: build1) }
let!(:deploy) { create(:ci_build, :skipped, :dependent, name: 'deploy', pipeline: pipeline, stage_idx: 2, needed: test3) }
subject(:service) { described_class.new(project, user) }
subject(:execute_service) { described_class.new(project, user).execute(build1) }
context 'stage-dag mixed pipeline' do
let(:config) do
<<-EOY
stages: [a, b, c]
shared_examples 'processing subsequent skipped jobs' do
it 'marks subsequent skipped jobs as processable' do
expect(test1.reload).to be_success
expect(test2.reload).to be_skipped
expect(test3.reload).to be_skipped
expect(deploy.reload).to be_skipped
a1:
stage: a
script: exit $(($RANDOM % 2))
a2:
stage: a
script: exit 0
needs: [a1]
b1:
stage: b
script: exit 0
needs: []
b2:
stage: b
script: exit 0
needs: [a2]
execute_service
c1:
stage: c
script: exit 0
needs: [b2]
expect(test1.reload).to be_success
expect(test2.reload).to be_created
expect(test3.reload).to be_created
expect(deploy.reload).to be_created
c2:
stage: c
script: exit 0
EOY
end
let(:pipeline) do
Ci::CreatePipelineService.new(project, user, { ref: 'master' }).execute(:push).payload
end
it_behaves_like 'processing subsequent skipped jobs'
let(:a1) { find_job('a1') }
let(:b1) { find_job('b1') }
before do
stub_ci_pipeline_yaml_file(config)
check_jobs_statuses(
a1: 'pending',
a2: 'created',
b1: 'pending',
b2: 'created',
c1: 'created',
c2: 'created'
)
b1.success!
check_jobs_statuses(
a1: 'pending',
a2: 'created',
b1: 'success',
b2: 'created',
c1: 'created',
c2: 'created'
)
a1.drop!
check_jobs_statuses(
a1: 'failed',
a2: 'skipped',
b1: 'success',
b2: 'skipped',
c1: 'skipped',
c2: 'skipped'
)
context 'when there is a job need from the same stage' do
let!(:build2) do
create(:ci_build,
:skipped,
:dependent,
name: 'build2',
pipeline: pipeline,
stage_idx: 0,
scheduling_type: :dag,
needed: build1)
new_a1 = Ci::RetryBuildService.new(project, user).clone!(a1)
new_a1.enqueue!
check_jobs_statuses(
a1: 'pending',
a2: 'skipped',
b1: 'success',
b2: 'skipped',
c1: 'skipped',
c2: 'skipped'
)
end
shared_examples 'processing the same stage job' do
it 'marks subsequent skipped jobs as processable' do
expect { execute_service }.to change { build2.reload.status }.from('skipped').to('created')
execute_after_requeue_service(a1)
check_jobs_statuses(
a1: 'pending',
a2: 'created',
b1: 'success',
b2: 'created',
c1: 'created',
c2: 'created'
)
end
end
context 'stage-dag mixed pipeline with some same-stage needs' do
let(:config) do
<<-EOY
stages: [a, b, c]
a1:
stage: a
script: exit $(($RANDOM % 2))
a2:
stage: a
script: exit 0
needs: [a1]
b1:
stage: b
script: exit 0
needs: [b2]
b2:
stage: b
script: exit 0
c1:
stage: c
script: exit 0
needs: [b2]
c2:
stage: c
script: exit 0
EOY
end
it_behaves_like 'processing subsequent skipped jobs'
it_behaves_like 'processing the same stage job'
let(:pipeline) do
Ci::CreatePipelineService.new(project, user, { ref: 'master' }).execute(:push).payload
end
context 'when the pipeline is a downstream pipeline and the bridge is depended' do
let!(:trigger_job) { create(:ci_bridge, :strategy_depend, name: 'trigger_job', status: 'success') }
let(:a1) { find_job('a1') }
before do
create(:ci_sources_pipeline, pipeline: pipeline, source_job: trigger_job)
stub_ci_pipeline_yaml_file(config)
check_jobs_statuses(
a1: 'pending',
a2: 'created',
b1: 'created',
b2: 'created',
c1: 'created',
c2: 'created'
)
a1.drop!
check_jobs_statuses(
a1: 'failed',
a2: 'skipped',
b1: 'skipped',
b2: 'skipped',
c1: 'skipped',
c2: 'skipped'
)
new_a1 = Ci::RetryBuildService.new(project, user).clone!(a1)
new_a1.enqueue!
check_jobs_statuses(
a1: 'pending',
a2: 'skipped',
b1: 'skipped',
b2: 'skipped',
c1: 'skipped',
c2: 'skipped'
)
end
it 'marks subsequent skipped jobs as processable' do
execute_after_requeue_service(a1)
check_jobs_statuses(
a1: 'pending',
a2: 'created',
b1: 'created',
b2: 'created',
c1: 'created',
c2: 'created'
)
end
it 'marks source bridge as pending' do
expect { execute_service }.to change { trigger_job.reload.status }.from('success').to('pending')
context 'when the FF ci_fix_order_of_subsequent_jobs is disabled' do
before do
stub_feature_flags(ci_fix_order_of_subsequent_jobs: false)
end
it 'does not mark b1 as processable' do
execute_after_requeue_service(a1)
check_jobs_statuses(
a1: 'pending',
a2: 'created',
b1: 'skipped',
b2: 'created',
c1: 'created',
c2: 'created'
)
end
end
end
private
def find_job(name)
processables.find_by!(name: name)
end
def check_jobs_statuses(statuses)
expect(processables.order(:name).pluck(:name, :status)).to contain_exactly(*statuses.stringify_keys.to_a)
end
def processables
pipeline.processables.latest
end
def execute_after_requeue_service(processable)
service.execute(processable)
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment