Commit 971e91c1 authored by Kamil Trzciński's avatar Kamil Trzciński

Merge branch 'fix-merge-train-race-condition' into 'master'

Use state machine in Merge Train to avoid race conditions

Closes #196133

See merge request gitlab-org/gitlab!23395
parents 6180d91a c1977621
......@@ -13,7 +13,7 @@
:feature_category: :continuous_delivery
:has_external_dependencies:
:latency_sensitive:
:resource_boundary: :unknown
:resource_boundary: :cpu
:weight: 3
- :name: chaos:chaos_cpu_spin
:feature_category: :chaos_engineering
......
......@@ -5,6 +5,7 @@ class AutoMergeProcessWorker
queue_namespace :auto_merge
feature_category :continuous_delivery
worker_resource_boundary :cpu
def perform(merge_request_id)
MergeRequest.find_by_id(merge_request_id).try do |merge_request|
......
---
title: Use state machine in Merge Train to avoid race conditions
merge_request: 23395
author:
type: fixed
# frozen_string_literal: true
class AddDurationToMergeTrains < ActiveRecord::Migration[5.2]
DOWNTIME = false
def change
add_column :merge_trains, :merged_at, :datetime_with_timezone
add_column :merge_trains, :duration, :integer
end
end
......@@ -2624,6 +2624,8 @@ ActiveRecord::Schema.define(version: 2020_01_30_161817) do
t.integer "target_project_id", null: false
t.text "target_branch", null: false
t.integer "status", limit: 2, default: 0, null: false
t.datetime_with_timezone "merged_at"
t.integer "duration"
t.index ["merge_request_id"], name: "index_merge_trains_on_merge_request_id", unique: true
t.index ["pipeline_id"], name: "index_merge_trains_on_pipeline_id"
t.index ["target_project_id", "target_branch", "status"], name: "index_for_status_per_branch_per_project"
......
......@@ -62,14 +62,6 @@ module EE
accepts_nested_attributes_for :approval_rules, allow_destroy: true
state_machine :state_id do
after_transition any => :merged do |merge_request|
merge_request.merge_train&.merged!
true
end
end
scope :order_review_time_desc, -> do
joins(:metrics).reorder(::Gitlab::Database.nulls_last_order('merge_request_metrics.first_comment_at'))
end
......
......@@ -3,16 +3,14 @@
class MergeTrain < ApplicationRecord
include AfterCommitQueue
ACTIVE_STATUSES = %w[created stale fresh].freeze
ACTIVE_STATUSES = %w[idle stale fresh].freeze
COMPLETE_STATUSES = %w[merged merging].freeze
belongs_to :target_project, class_name: "Project"
belongs_to :merge_request, inverse_of: :merge_train
belongs_to :user
belongs_to :pipeline, class_name: 'Ci::Pipeline'
after_commit :cleanup_ref, if: -> { saved_change_to_status? && merged? }
after_commit :refresh_async, if: -> { saved_change_to_status? && stale? }
after_destroy do |merge_train|
run_after_commit do
merge_train.pipeline&.cancel_running(retries: 1)
......@@ -20,10 +18,55 @@ class MergeTrain < ApplicationRecord
end
end
enum status: %i[created merged stale fresh]
state_machine :status, initial: :idle do
event :refresh_pipeline do
transition %i[idle stale fresh] => :fresh
end
event :outdate_pipeline do
transition fresh: :stale
end
event :start_merge do
transition fresh: :merging
end
event :finish_merge do
transition merging: :merged
end
before_transition on: :refresh_pipeline do |merge_train, transition|
pipeline_id = transition.args.first
merge_train.pipeline_id = pipeline_id
end
before_transition any => :merged do |merge_train|
merged_at = Time.zone.now
merge_train.merged_at = merged_at
merge_train.duration = merged_at - merge_train.created_at
end
scope :active, -> { where(status: active_statuses) }
scope :merged, -> { where(status: merged_statuses) }
after_transition fresh: :stale do |merge_train|
merge_train.run_after_commit do
merge_train.refresh_async
end
end
after_transition merging: :merged do |merge_train|
merge_train.run_after_commit do
merge_train.cleanup_ref
end
end
state :idle, value: 0
state :merged, value: 1
state :stale, value: 2
state :fresh, value: 3
state :merging, value: 4
end
scope :active, -> { with_status(*ACTIVE_STATUSES) }
scope :complete, -> { with_status(*COMPLETE_STATUSES) }
scope :for_target, -> (project_id, branch) { where(target_project_id: project_id, target_branch: branch) }
scope :by_id, -> { order('merge_trains.id ASC') }
......@@ -47,12 +90,12 @@ class MergeTrain < ApplicationRecord
all_active_mrs_in_train(merge_request.target_project_id, merge_request.target_branch).where(id: merge_request_ids).first
end
def last_merged_mr_in_train(target_project_id, target_branch)
MergeRequest.where(id: last_merged_merge_train(target_project_id, target_branch)).take
def last_complete_mr_in_train(target_project_id, target_branch)
MergeRequest.where(id: last_complete_merge_train(target_project_id, target_branch)).take
end
def sha_exists_in_history?(target_project_id, target_branch, newrev, limit: 20)
MergeRequest.exists?(id: merged_merge_trains(target_project_id, target_branch, limit: limit),
MergeRequest.exists?(id: complete_merge_trains(target_project_id, target_branch, limit: limit),
merge_commit_sha: newrev)
end
......@@ -60,14 +103,6 @@ class MergeTrain < ApplicationRecord
all_active_mrs_in_train(merge_request.target_project_id, merge_request.target_branch).count
end
def active_statuses
statuses.values_at(*ACTIVE_STATUSES)
end
def merged_statuses
statuses.values_at(:merged)
end
private
def first_merge_request_ids(project)
......@@ -77,13 +112,13 @@ class MergeTrain < ApplicationRecord
.order(:target_branch, :id)
end
def last_merged_merge_train(target_project_id, target_branch)
merged_merge_trains(target_project_id, target_branch, limit: 1)
def last_complete_merge_train(target_project_id, target_branch)
complete_merge_trains(target_project_id, target_branch, limit: 1)
end
def merged_merge_trains(target_project_id, target_branch, limit:)
def complete_merge_trains(target_project_id, target_branch, limit:)
MergeTrain.for_target(target_project_id, target_branch)
.merged.order(id: :desc).select(:merge_request_id).limit(limit)
.complete.order(id: :desc).select(:merge_request_id).limit(limit)
end
end
......@@ -120,11 +155,9 @@ class MergeTrain < ApplicationRecord
end
def active?
ACTIVE_STATUSES.include?(status)
ACTIVE_STATUSES.include?(status_name.to_s)
end
private
def refresh_async
AutoMergeProcessWorker.perform_async(merge_request_id)
end
......
......@@ -26,7 +26,7 @@ module AutoMerge
super do
if merge_request.merge_train&.destroy
SystemNoteService.cancel_merge_train(merge_request, project, current_user)
next_merge_request.merge_train.stale! if next_merge_request
next_merge_request.merge_train.outdate_pipeline if next_merge_request
end
end
end
......@@ -39,7 +39,7 @@ module AutoMerge
super(merge_request, reason) do
if merge_request.merge_train&.destroy
SystemNoteService.abort_merge_train(merge_request, project, current_user, reason)
next_merge_request.merge_train.stale! if next_merge_request && process_next
next_merge_request.merge_train.outdate_pipeline if next_merge_request && process_next
end
end
end
......
......@@ -48,6 +48,8 @@ module EE
# rubocop:disable Gitlab/ModuleWithInstanceVariables
def check_merge_train_status
return unless @push.branch_updated?
MergeTrains::CheckStatusService.new(project, current_user)
.execute(project, @push.branch_name, @push.newrev)
end
......
......@@ -9,7 +9,7 @@ module MergeTrains
# that means there was an unexpected commit came from out of merge train cycle.
unless MergeTrain.sha_exists_in_history?(target_project.id, target_branch, newrev)
merge_request = MergeTrain.first_in_train(target_project.id, target_branch)
merge_request.merge_train.stale! if merge_request
merge_request.merge_train.outdate_pipeline if merge_request
end
end
end
......
......@@ -33,10 +33,6 @@ module MergeTrains
raise ProcessError, 'merge request is not on a merge train'
end
if merge_request.squash? && Feature.disabled?(:merge_train_new_stale_check, project, default_enabled: true)
raise ProcessError, 'merge train does not support squash merge'
end
unless merge_request.mergeable_state?(skip_ci_check: true)
raise ProcessError, 'merge request is not mergeable'
end
......@@ -52,9 +48,6 @@ module MergeTrains
end
end
# Since `stale_pipeline?` is expensive process which requires multiple Gitaly calls,
# each refresh service relays `require_recreate` flag whether the next
# merge request obviously requires to re-create pipeline for merge train.
def should_create_pipeline?
pipeline_absent? || require_recreate? || stale_pipeline?
end
......@@ -70,41 +63,22 @@ module MergeTrains
end
def should_merge?
first_in_train? && pipeline_for_merge_train&.success?
pipeline_for_merge_train&.success? && first_in_train?
end
def merge!
merge_train.start_merge!
MergeRequests::MergeService.new(project, merge_user, merge_request.merge_params)
.execute(merge_request)
raise ProcessError, "failed to merge. #{merge_request.merge_error}" unless merge_request.merged?
end
def stale_pipeline?
if Feature.enabled?(:merge_train_new_stale_check, project, default_enabled: true)
merge_train.stale?
else
legacy_stale_pipeline?
end
merge_train.finish_merge!
end
# NOTE: This method works for both no-ff-merge and ff-merge, however,
# it doesn't work for squash and merge option.
def legacy_stale_pipeline?
return true unless pipeline_for_merge_train.source_sha == merge_request.diff_head_sha
return false if pipeline_for_merge_train.target_sha == previous_ref_sha
##
# Now `pipeline.target_sha` and `previous_ref_sha` are different. This case
# happens in the following cases:
# 1. Previous sha has a completely different history from the pipeline.target_sha.
# e.g. Previous merge request was dropped from the merge train.
# 2. Previous sha has exactly the same history with the pipeline.target_sha.
# e.g. Previous merge request was merged into target branch with no-ff option.
#
# We distinguish these two cases by comparing parent commits.
commits = merge_request.project.commits_by(oids: [pipeline_for_merge_train.target_sha, previous_ref_sha])
commits[0].parent_ids != commits[1].parent_ids
def stale_pipeline?
merge_train.stale?
end
def pipeline_absent?
......@@ -128,8 +102,7 @@ module MergeTrains
end
def update_pipeline_for_merge_train(pipeline)
merge_train.pipeline = pipeline
merge_train.fresh!
merge_train.refresh_pipeline!(pipeline.id)
end
def merge_user
......
......@@ -15,16 +15,6 @@ module MergeTrains
def execute(merge_request)
return unless merge_request.on_train?
if Feature.enabled?(:merge_trains_efficient_refresh, default_enabled: true)
efficient_refresh(merge_request)
else
legacy_refresh(merge_request)
end
end
private
def efficient_refresh(merge_request)
queue = Gitlab::BatchPopQueueing.new('merge_trains', queue_id(merge_request))
result = queue.safe_execute([merge_request.id], lock_timeout: 15.minutes) do |items|
......@@ -39,11 +29,7 @@ module MergeTrains
end
end
def legacy_refresh(merge_request)
in_lock("merge_train:#{merge_request.target_project_id}-#{merge_request.target_branch}") do
unsafe_refresh(merge_request)
end
end
private
def unsafe_refresh(merge_request)
require_next_recreate = false
......
......@@ -11,7 +11,7 @@ FactoryBot.modify do
trait :on_train do
transient do
train_creator { author }
status { 'created' }
status { 'idle' }
end
auto_merge_enabled { true }
......
......@@ -8,20 +8,24 @@ FactoryBot.define do
user
pipeline factory: :ci_pipeline
trait :created do
status { :created }
trait :idle do
status { MergeTrain.state_machines[:status].states[:idle].value }
end
trait :merged do
status { :merged }
status { MergeTrain.state_machines[:status].states[:merged].value }
end
trait :merging do
status { MergeTrain.state_machines[:status].states[:merging].value }
end
trait :stale do
status { :stale }
status { MergeTrain.state_machines[:status].states[:stale].value }
end
trait :fresh do
status { :fresh }
status { MergeTrain.state_machines[:status].states[:fresh].value }
end
end
end
......@@ -238,7 +238,7 @@ describe 'Two merge requests on a merge train' do
end
it 'does not recreate pipeline when merge request 1 refreshed again' do
expect { merge_request_1.merge_train.send(:refresh_async) }
expect { AutoMergeProcessWorker.perform_async(merge_request_1.id) }
.not_to change { merge_request_1.all_pipelines.count }
end
......
......@@ -748,7 +748,9 @@ describe MergeRequest do
context 'when the merge request was on a merge train' do
let(:merge_request) do
create(:merge_request, :on_train, status: 'merged', source_project: project, target_project: project)
create(:merge_request, :on_train,
status: MergeTrain.state_machines[:status].states[:merged].value,
source_project: project, target_project: project)
end
it { is_expected.to be_falsy }
......@@ -763,34 +765,6 @@ describe MergeRequest do
end
end
describe 'state machine' do
context 'when the merge request is on a merge train' do
let(:merge_request) do
create(:merge_request, :on_train, source_project: project, target_project: project)
end
context 'when the merge request is merged' do
it 'ensures to finish merge train' do
expect(merge_request.merge_train).to receive(:merged!)
merge_request.mark_as_merged!
end
end
end
context 'when the merge request is not on a merge train' do
let(:merge_request) do
create(:merge_request, source_project: project, target_project: project)
end
context 'when the merge request is merged' do
it 'does not raise error' do
expect { merge_request.mark_as_merged! }.not_to raise_error
end
end
end
end
describe 'review time sorting' do
it 'orders by first_comment_at' do
merge_request_1 = create(:merge_request, :with_productivity_metrics, metrics_data: { first_comment_at: 1.day.ago })
......
This diff is collapsed.
......@@ -160,9 +160,12 @@ describe AutoMerge::MergeTrainService do
let!(:merge_request_2) do
create(:merge_request, :on_train,
source_project: project, source_branch: 'signed-commits',
target_project: project, target_branch: 'master')
target_project: project, target_branch: 'master',
status: status)
end
let(:status) { MergeTrain.state_machines[:status].states[:fresh].value }
it 'processes the next merge request on the train by default' do
expect(AutoMergeProcessWorker).to receive(:perform_async).with(merge_request_2.id)
......@@ -170,6 +173,18 @@ describe AutoMerge::MergeTrainService do
expect(merge_request_2.reset.merge_train).to be_stale
end
context 'when the status is stale already' do
let(:status) { MergeTrain.state_machines[:status].states[:stale].value }
it 'does not do anything' do
expect(AutoMergeProcessWorker).not_to receive(:perform_async).with(merge_request_2.id)
expect { subject }.not_to raise_error
expect(merge_request_2.reset.merge_train).to be_stale
end
end
end
end
......@@ -208,7 +223,8 @@ describe AutoMerge::MergeTrainService do
let!(:merge_request_2) do
create(:merge_request, :on_train,
source_project: project, source_branch: 'signed-commits',
target_project: project, target_branch: 'master')
target_project: project, target_branch: 'master',
status: MergeTrain.state_machines[:status].states[:fresh].value)
end
it 'processes the next merge request on the train' do
......
......@@ -50,6 +50,16 @@ describe MergeRequests::RefreshService do
subject
end
context 'when branch is deleted' do
let(:newrev) { Gitlab::Git::BLANK_SHA }
it 'does not check merge train status' do
expect(MergeTrains::CheckStatusService).not_to receive(:new)
subject
end
end
context '#update_approvers' do
let(:owner) { create(:user) }
let(:current_user) { merge_request.author }
......
......@@ -25,7 +25,7 @@ describe MergeTrains::CheckStatusService do
create(:merge_request, :on_train, train_creator: maintainer,
source_branch: 'feature', source_project: project,
target_branch: 'master', target_project: project,
merge_status: 'unchecked')
merge_status: 'unchecked', status: MergeTrain.state_machines[:status].states[:merged].value)
end
let!(:active_merge_request) do
......@@ -43,9 +43,9 @@ describe MergeTrains::CheckStatusService do
context 'when new revision is included in merge train history' do
let!(:merge_commit_sha_1) { Digest::SHA1.hexdigest 'test' }
it 'does not mark merge train as stale' do
it 'does not outdate the merge train pipeline' do
expect(MergeTrain).to receive(:sha_exists_in_history?).and_return(true).and_call_original
expect_any_instance_of(MergeTrain).not_to receive(:stale!)
expect_any_instance_of(MergeTrain).not_to receive(:outdate_pipeline)
subject
end
......@@ -54,9 +54,9 @@ describe MergeTrains::CheckStatusService do
context 'when new revision is not included in merge train history' do
let!(:merge_commit_sha_1) { Digest::SHA1.hexdigest 'other' }
it 'marks the merge train as stale' do
it 'outdates the merge train pipeline' do
expect(MergeTrain).to receive(:sha_exists_in_history?).and_return(false).and_call_original
expect_any_instance_of(MergeTrain).to receive(:stale!)
expect_any_instance_of(MergeTrain).to receive(:outdate_pipeline)
subject
end
......
......@@ -122,21 +122,9 @@ describe MergeTrains::RefreshMergeRequestService do
merge_request.update!(squash: true)
end
context 'when merge_train_new_stale_check feature flag is enabled' do
let(:pipeline) { create(:ci_pipeline) }
it_behaves_like 'creates a pipeline for merge train'
end
context 'when merge_train_new_stale_check feature flag is disabled' do
before do
stub_feature_flags(merge_train_new_stale_check: false)
end
let(:pipeline) { create(:ci_pipeline) }
it_behaves_like 'drops the merge request from the merge train' do
let(:expected_reason) { 'merge train does not support squash merge' }
end
end
it_behaves_like 'creates a pipeline for merge train'
end
context 'when previous ref is not found' do
......@@ -174,7 +162,7 @@ describe MergeTrains::RefreshMergeRequestService do
let(:previous_ref_sha) { project.repository.commit('refs/heads/master').sha }
before do
merge_request.merge_train.update!(pipeline: pipeline)
merge_request.merge_train.refresh_pipeline!(pipeline.id)
end
context 'when the pipeline is not stale' do
......@@ -182,23 +170,11 @@ describe MergeTrains::RefreshMergeRequestService do
end
context 'when the pipeline is stale' do
context 'when merge_train_new_stale_check feature flag is enabled' do
before do
merge_request.merge_train.update_column(:status, :stale)
end
it_behaves_like 'cancels and recreates a pipeline for the merge train'
before do
merge_request.merge_train.update_column(:status, MergeTrain.state_machines[:status].states[:stale].value)
end
context 'when merge_train_new_stale_check feature flag is disabled' do
before do
stub_feature_flags(merge_train_new_stale_check: false)
end
let(:previous_ref_sha) { project.repository.commits('refs/heads/master', limit: 2).last.sha }
it_behaves_like 'cancels and recreates a pipeline for the merge train'
end
it_behaves_like 'cancels and recreates a pipeline for the merge train'
end
context 'when the pipeline is required to be recreated' do
......@@ -213,7 +189,7 @@ describe MergeTrains::RefreshMergeRequestService do
let(:previous_ref_sha) { project.repository.commit('refs/heads/master').sha }
before do
merge_request.merge_train.pipeline = pipeline
merge_request.merge_train.refresh_pipeline!(pipeline.id)
merge_request.merge_params[:sha] = merge_request.diff_head_sha
merge_request.save
end
......@@ -221,12 +197,14 @@ describe MergeTrains::RefreshMergeRequestService do
context 'when the merge request is the first queue' do
it 'merges the merge request' do
expect(merge_request).to receive(:cleanup_refs).with(only: :train)
expect(merge_request.merge_train).to receive(:start_merge!).and_call_original
expect(merge_request.merge_train).to receive(:finish_merge!).and_call_original
expect_next_instance_of(MergeRequests::MergeService, project, maintainer, anything) do |service|
expect(service).to receive(:execute).with(merge_request).and_call_original
end
expect { subject }
.to change { merge_request.merge_train.status }.from('created').to('merged')
.to change { merge_request.merge_train.status_name }.from(:fresh).to(:merged)
end
context 'when it failed to merge the merge request' do
......@@ -236,6 +214,16 @@ describe MergeTrains::RefreshMergeRequestService do
allow_any_instance_of(MergeRequests::MergeService).to receive(:execute) { { result: :error } }
end
it 'does not finish merge and drops the merge request from train' do
expect(merge_request).to be_on_train
expect(merge_request.merge_train).to receive(:start_merge!).and_call_original
expect(merge_request.merge_train).not_to receive(:finish_merge!)
subject
expect(merge_request).not_to be_on_train
end
it_behaves_like 'drops the merge request from the merge train' do
let(:expected_reason) { 'failed to merge. Branch has been updated since the merge was requested.' }
end
......
......@@ -115,7 +115,7 @@ describe MergeTrains::RefreshMergeRequestsService do
context 'when merge request 1 was on a merge train' do
before do
allow(merge_request_1.merge_train).to receive(:cleanup_ref)
merge_request_1.merge_train.merged!
merge_request_1.merge_train.update_column(:status, MergeTrain.state_machines[:status].states[:merged].value)
end
it 'does not refresh' do
......@@ -146,26 +146,6 @@ describe MergeTrains::RefreshMergeRequestsService do
subject
end
end
context 'when merge_trains_efficient_refresh is disabled' do
before do
stub_feature_flags(merge_trains_efficient_refresh: false)
end
context 'when the exclusive lock has already been taken' do
let(:lease_key) do
"merge_train:#{merge_request_1.target_project_id}-#{merge_request_1.target_branch}"
end
before do
stub_exclusive_lease_taken(lease_key)
end
it 'raises FailedToObtainLockError' do
expect { subject }.to raise_error(Gitlab::ExclusiveLeaseHelpers::FailedToObtainLockError)
end
end
end
end
context 'when merge request 2 is passed' do
......@@ -194,7 +174,7 @@ describe MergeTrains::RefreshMergeRequestsService do
context 'when merge request 1 has already been merged' do
before do
allow(merge_request_1.merge_train).to receive(:cleanup_ref)
merge_request_1.merge_train.merged!
merge_request_1.merge_train.update_column(:status, MergeTrain.state_machines[:status].states[:merged].value)
end
it 'does not refresh the merge request 1' do
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment