Commit 3f44cdcf authored by Matthias Käppler's avatar Matthias Käppler

Merge branch 'drop-pipelines-async-when-user-blocked' into 'master'

Drop user pipelines async when user is blocked

See merge request gitlab-org/gitlab!59129
parents 425380d8 8a403acb
...@@ -354,7 +354,7 @@ class User < ApplicationRecord ...@@ -354,7 +354,7 @@ class User < ApplicationRecord
# this state transition object in order to do a rollback. # this state transition object in order to do a rollback.
# For this reason the tradeoff is to disable this cop. # For this reason the tradeoff is to disable this cop.
after_transition any => :blocked do |user| after_transition any => :blocked do |user|
Ci::AbortPipelinesService.new.execute(user.pipelines, :user_blocked) Ci::DropPipelineService.new.execute_async_for_all(user.pipelines, :user_blocked, user)
Ci::DisableUserPipelineSchedulesService.new.execute(user) Ci::DisableUserPipelineSchedulesService.new.execute(user)
end end
# rubocop: enable CodeReuse/ServiceClass # rubocop: enable CodeReuse/ServiceClass
......
...@@ -3,7 +3,7 @@ ...@@ -3,7 +3,7 @@
module Ci module Ci
class AbortPipelinesService class AbortPipelinesService
# NOTE: This call fails pipelines in bulk without running callbacks. # NOTE: This call fails pipelines in bulk without running callbacks.
# Only for pipeline abandonment scenarios (examples: project delete, user block) # Only for pipeline abandonment scenarios (examples: project delete)
def execute(pipelines, failure_reason) def execute(pipelines, failure_reason)
pipelines.cancelable.each_batch(of: 100) do |pipeline_batch| pipelines.cancelable.each_batch(of: 100) do |pipeline_batch|
now = Time.current now = Time.current
......
# frozen_string_literal: true
module Ci
class DropPipelineService
# execute service asynchronously for each cancelable pipeline
def execute_async_for_all(pipelines, failure_reason, context_user)
pipelines.cancelable.select(:id).find_in_batches do |pipelines_batch|
Ci::DropPipelineWorker.bulk_perform_async_with_contexts(
pipelines_batch,
arguments_proc: -> (pipeline) { [pipeline.id, failure_reason] },
context_proc: -> (_) { { user: context_user } }
)
end
end
def execute(pipeline, failure_reason, retries: 3)
Gitlab::OptimisticLocking.retry_lock(pipeline.cancelable_statuses, retries, name: 'ci_pipeline_drop_running') do |cancelables|
cancelables.find_in_batches do |batch|
preload_associations_for_drop(batch)
batch.each do |job|
job.drop(failure_reason)
end
end
end
end
private
def preload_associations_for_drop(builds_batch)
ActiveRecord::Associations::Preloader.new.preload( # rubocop: disable CodeReuse/ActiveRecord
builds_batch,
[:project, :pipeline, :metadata, :deployment, :taggings]
)
end
end
end
...@@ -1251,6 +1251,14 @@ ...@@ -1251,6 +1251,14 @@
:weight: 3 :weight: 3
:idempotent: :idempotent:
:tags: [] :tags: []
- :name: pipeline_default:ci_drop_pipeline
:feature_category: :continuous_integration
:has_external_dependencies:
:urgency: :low
:resource_boundary: :unknown
:weight: 3
:idempotent: true
:tags: []
- :name: pipeline_default:ci_merge_requests_add_todo_when_build_fails - :name: pipeline_default:ci_merge_requests_add_todo_when_build_fails
:feature_category: :continuous_integration :feature_category: :continuous_integration
:has_external_dependencies: :has_external_dependencies:
......
# frozen_string_literal: true
module Ci
class DropPipelineWorker
include ApplicationWorker
include PipelineQueue
idempotent!
def perform(pipeline_id, failure_reason)
Ci::Pipeline.find_by_id(pipeline_id).try do |pipeline|
Ci::DropPipelineService.new.execute(pipeline, failure_reason.to_sym)
end
end
end
end
---
title: Drop user pipelines async when user is blocked
merge_request: 59129
author:
type: fixed
...@@ -1803,8 +1803,8 @@ RSpec.describe User do ...@@ -1803,8 +1803,8 @@ RSpec.describe User do
it 'aborts all running pipelines and related jobs' do it 'aborts all running pipelines and related jobs' do
expect(user).to receive(:pipelines).and_return(pipelines) expect(user).to receive(:pipelines).and_return(pipelines)
expect(Ci::AbortPipelinesService).to receive(:new).and_return(service) expect(Ci::DropPipelineService).to receive(:new).and_return(service)
expect(service).to receive(:execute).with(pipelines, :user_blocked) expect(service).to receive(:execute_async_for_all).with(pipelines, :user_blocked, user)
user.block user.block
end end
......
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Ci::DropPipelineService do
let_it_be(:user) { create(:user) }
let(:failure_reason) { :user_blocked }
let!(:cancelable_pipeline) { create(:ci_pipeline, :running, user: user) }
let!(:running_build) { create(:ci_build, :running, pipeline: cancelable_pipeline) }
let!(:success_pipeline) { create(:ci_pipeline, :success, user: user) }
let!(:success_build) { create(:ci_build, :success, pipeline: success_pipeline) }
describe '#execute_async_for_all' do
subject { described_class.new.execute_async_for_all(user.pipelines, failure_reason, user) }
it 'drops only cancelable pipelines asynchronously', :sidekiq_inline do
subject
expect(cancelable_pipeline.reload).to be_failed
expect(running_build.reload).to be_failed
expect(success_pipeline.reload).to be_success
expect(success_build.reload).to be_success
end
end
describe '#execute' do
subject { described_class.new.execute(cancelable_pipeline.id, failure_reason) }
def drop_pipeline!(pipeline)
described_class.new.execute(pipeline, failure_reason)
end
it 'drops each cancelable build in the pipeline', :aggregate_failures do
drop_pipeline!(cancelable_pipeline)
expect(running_build.reload).to be_failed
expect(running_build.failure_reason).to eq(failure_reason.to_s)
expect(success_build.reload).to be_success
end
it 'avoids N+1 queries when reading data' do
control_count = ActiveRecord::QueryRecorder.new do
drop_pipeline!(cancelable_pipeline)
end.count
writes_per_build = 2
expected_reads_count = control_count - writes_per_build
create_list(:ci_build, 5, :running, pipeline: cancelable_pipeline)
expect do
drop_pipeline!(cancelable_pipeline)
end.not_to exceed_query_limit(expected_reads_count + (5 * writes_per_build))
end
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Ci::DropPipelineWorker do
include AfterNextHelpers
let(:pipeline) { create(:ci_pipeline, :running) }
let(:failure_reason) { :user_blocked }
describe '#perform' do
subject { described_class.new.perform(pipeline.id, failure_reason) }
it 'calls delegates to the service' do
expect_next(Ci::DropPipelineService).to receive(:execute).with(pipeline, failure_reason)
subject
end
it_behaves_like 'an idempotent worker' do
let!(:running_build) { create(:ci_build, :running, pipeline: pipeline) }
let!(:success_build) { create(:ci_build, :success, pipeline: pipeline) }
let(:job_args) { [pipeline.id, failure_reason] }
it 'executes the service', :aggregate_failures do
subject
expect(running_build.reload).to be_failed
expect(running_build.failure_reason).to eq(failure_reason.to_s)
expect(success_build.reload).to be_success
end
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment