Commit 180b3ff3 authored by Marius Bobin's avatar Marius Bobin Committed by Fabio Pitino

Bulk insert job tags when creating a CI pipelines

Changelog: performance
parent a33b8fdc
......@@ -10,6 +10,7 @@ module Ci
include Presentable
include Importable
include Ci::HasRef
extend ::Gitlab::Utils::Override
BuildArchivedError = Class.new(StandardError)
......@@ -723,6 +724,14 @@ module Ci
self.token && ActiveSupport::SecurityUtils.secure_compare(token, self.token)
end
# acts_as_taggable uses this method create/remove tags with contexts
# defined by taggings and to get those contexts it executes a query.
# We don't use any other contexts except `tags`, so we don't need it.
override :custom_contexts
def custom_contexts
[]
end
def tag_list
if tags.loaded?
tags.map(&:name)
......
......@@ -217,6 +217,10 @@ class CommitStatus < Ci::ApplicationRecord
false
end
def self.bulk_insert_tags!(statuses, tag_list_by_build)
Gitlab::Ci::Tags::BulkInsert.new(statuses, tag_list_by_build).insert!
end
def locking_enabled?
will_save_change_to_status?
end
......
---
name: ci_bulk_insert_tags
introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/73198
rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/346124
milestone: '14.6'
type: development
group: group::pipeline execution
default_enabled: false
......@@ -6,11 +6,17 @@ module Gitlab
module Chain
class Create < Chain::Base
include Chain::Helpers
include Gitlab::Utils::StrongMemoize
def perform!
logger.instrument(:pipeline_save) do
BulkInsertableAssociations.with_bulk_insert do
pipeline.save!
tags = extract_tag_list_by_status
pipeline.transaction do
pipeline.save!
CommitStatus.bulk_insert_tags!(statuses, tags) if bulk_insert_tags?
end
end
end
rescue ActiveRecord::RecordInvalid => e
......@@ -20,6 +26,37 @@ module Gitlab
def break?
!pipeline.persisted?
end
private
def statuses
strong_memoize(:statuses) do
pipeline.stages.flat_map(&:statuses)
end
end
# We call `job.tag_list=` to assign tags to the jobs from the
# Chain::Seed step which uses the `@tag_list` instance variable to
# store them on the record. We remove them here because we want to
# bulk insert them, otherwise they would be inserted and assigned one
# by one with callbacks. We must use `remove_instance_variable`
# because having the instance variable defined would still run the callbacks
def extract_tag_list_by_status
return {} unless bulk_insert_tags?
statuses.each.with_object({}) do |job, acc|
tag_list = job.clear_memoization(:tag_list)
next unless tag_list
acc[job.name] = tag_list
end
end
def bulk_insert_tags?
strong_memoize(:bulk_insert_tags) do
::Feature.enabled?(:ci_bulk_insert_tags, project, default_enabled: :yaml)
end
end
end
end
end
......
......@@ -200,11 +200,13 @@ module Gitlab
end
def runner_tags
{ tag_list: evaluate_runner_tags }.compact
strong_memoize(:runner_tags) do
{ tag_list: evaluate_runner_tags }.compact
end
end
def evaluate_runner_tags
@seed_attributes[:tag_list]&.map do |tag|
@seed_attributes.delete(:tag_list)&.map do |tag|
ExpandVariables.expand_existing(tag, -> { evaluate_context.variables_hash })
end
end
......
# frozen_string_literal: true
module Gitlab
module Ci
module Tags
class BulkInsert
TAGGINGS_BATCH_SIZE = 1000
TAGS_BATCH_SIZE = 500
def initialize(statuses, tag_list_by_status)
@statuses = statuses
@tag_list_by_status = tag_list_by_status
end
def insert!
return false if tag_list_by_status.empty?
persist_build_tags!
end
private
attr_reader :statuses, :tag_list_by_status
def persist_build_tags!
all_tags = tag_list_by_status.values.flatten.uniq.reject(&:blank?)
tag_records_by_name = create_tags(all_tags).index_by(&:name)
taggings = build_taggings_attributes(tag_records_by_name)
return false if taggings.empty?
taggings.each_slice(TAGGINGS_BATCH_SIZE) do |taggings_slice|
ActsAsTaggableOn::Tagging.insert_all!(taggings)
end
true
end
# rubocop: disable CodeReuse/ActiveRecord
def create_tags(tags)
existing_tag_records = ActsAsTaggableOn::Tag.where(name: tags).to_a
missing_tags = detect_missing_tags(tags, existing_tag_records)
return existing_tag_records if missing_tags.empty?
missing_tags
.map { |tag| { name: tag } }
.each_slice(TAGS_BATCH_SIZE) do |tags_attributes|
ActsAsTaggableOn::Tag.insert_all!(tags_attributes)
end
ActsAsTaggableOn::Tag.where(name: tags).to_a
end
# rubocop: enable CodeReuse/ActiveRecord
def build_taggings_attributes(tag_records_by_name)
taggings = statuses.flat_map do |status|
tag_list = tag_list_by_status[status.name]
next unless tag_list
tags = tag_records_by_name.values_at(*tag_list)
taggings_for(tags, status)
end
taggings.compact!
taggings
end
def taggings_for(tags, status)
tags.map do |tag|
{
tag_id: tag.id,
taggable_type: CommitStatus.name,
taggable_id: status.id,
created_at: Time.current,
context: 'tags'
}
end
end
def detect_missing_tags(tags, tag_records)
if tags.size != tag_records.size
tags - tag_records.map(&:name)
else
[]
end
end
end
end
end
end
......@@ -56,4 +56,74 @@ RSpec.describe Gitlab::Ci::Pipeline::Chain::Create do
.to include /Failed to persist the pipeline/
end
end
context 'tags persistence' do
let(:stage) do
build(:ci_stage_entity, pipeline: pipeline)
end
let(:job) do
build(:ci_build, stage: stage, pipeline: pipeline, project: project)
end
let(:bridge) do
build(:ci_bridge, stage: stage, pipeline: pipeline, project: project)
end
before do
pipeline.stages = [stage]
stage.statuses = [job, bridge]
end
context 'without tags' do
it 'extracts an empty tag list' do
expect(CommitStatus)
.to receive(:bulk_insert_tags!)
.with(stage.statuses, {})
.and_call_original
step.perform!
expect(job.instance_variable_defined?(:@tag_list)).to be_falsey
expect(job).to be_persisted
expect(job.tag_list).to eq([])
end
end
context 'with tags' do
before do
job.tag_list = %w[tag1 tag2]
end
it 'bulk inserts tags' do
expect(CommitStatus)
.to receive(:bulk_insert_tags!)
.with(stage.statuses, { job.name => %w[tag1 tag2] })
.and_call_original
step.perform!
expect(job.instance_variable_defined?(:@tag_list)).to be_falsey
expect(job).to be_persisted
expect(job.tag_list).to match_array(%w[tag1 tag2])
end
end
context 'when the feature flag is disabled' do
before do
job.tag_list = %w[tag1 tag2]
stub_feature_flags(ci_bulk_insert_tags: false)
end
it 'follows the old code path' do
expect(CommitStatus).not_to receive(:bulk_insert_tags!)
step.perform!
expect(job.instance_variable_defined?(:@tag_list)).to be_truthy
expect(job).to be_persisted
expect(job.reload.tag_list).to match_array(%w[tag1 tag2])
end
end
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Gitlab::Ci::Tags::BulkInsert do
let_it_be(:project) { create(:project, :repository) }
let_it_be(:pipeline) { create(:ci_pipeline, project: project) }
let_it_be_with_refind(:job) { create(:ci_build, :unique_name, pipeline: pipeline, project: project) }
let_it_be_with_refind(:other_job) { create(:ci_build, :unique_name, pipeline: pipeline, project: project) }
let_it_be_with_refind(:bridge) { create(:ci_bridge, pipeline: pipeline, project: project) }
let(:statuses) { [job, bridge, other_job] }
subject(:service) { described_class.new(statuses, tags_list) }
describe '#insert!' do
context 'without tags' do
let(:tags_list) { {} }
it { expect(service.insert!).to be_falsey }
end
context 'with tags' do
let(:tags_list) do
{
job.name => %w[tag1 tag2],
other_job.name => %w[tag2 tag3 tag4]
}
end
it 'persists tags' do
expect(service.insert!).to be_truthy
expect(job.reload.tag_list).to match_array(%w[tag1 tag2])
expect(other_job.reload.tag_list).to match_array(%w[tag2 tag3 tag4])
end
end
end
end
......@@ -958,4 +958,21 @@ RSpec.describe CommitStatus do
expect(build_from_other_pipeline.reload).to have_attributes(retried: false, processed: false)
end
end
describe '.bulk_insert_tags!' do
let(:statuses) { double('statuses') }
let(:tag_list_by_build) { double('tag list') }
let(:inserter) { double('inserter') }
it 'delegates to bulk insert class' do
expect(Gitlab::Ci::Tags::BulkInsert)
.to receive(:new)
.with(statuses, tag_list_by_build)
.and_return(inserter)
expect(inserter).to receive(:insert!)
described_class.bulk_insert_tags!(statuses, tag_list_by_build)
end
end
end
......@@ -7,16 +7,15 @@ RSpec.describe Ci::CreatePipelineService do
let_it_be(:user) { project.owner }
let(:ref) { 'refs/heads/master' }
let(:source) { :push }
let(:service) { described_class.new(project, user, { ref: ref }) }
let(:pipeline) { service.execute(source).payload }
let(:pipeline) { create_pipeline }
before do
stub_ci_pipeline_yaml_file(config)
stub_yaml_config(config)
end
context 'with valid config' do
let(:config) { YAML.dump({ test: { script: 'ls', tags: %w[tag1 tag2] } }) }
let(:config) { { test: { script: 'ls', tags: %w[tag1 tag2] } } }
it 'creates a pipeline', :aggregate_failures do
expect(pipeline).to be_created_successfully
......@@ -25,8 +24,8 @@ RSpec.describe Ci::CreatePipelineService do
end
context 'with too many tags' do
let(:tags) { Array.new(50) {|i| "tag-#{i}" } }
let(:config) { YAML.dump({ test: { script: 'ls', tags: tags } }) }
let(:tags) { build_tag_list(label: 'custom', size: 50) }
let(:config) { { test: { script: 'ls', tags: tags } } }
it 'creates a pipeline without builds', :aggregate_failures do
expect(pipeline).not_to be_created_successfully
......@@ -34,5 +33,167 @@ RSpec.describe Ci::CreatePipelineService do
expect(pipeline.yaml_errors).to eq("jobs:test:tags config must be less than the limit of #{Gitlab::Ci::Config::Entry::Tags::TAGS_LIMIT} tags")
end
end
context 'tags persistence' do
let(:config) do
{
build: {
script: 'ls',
stage: 'build',
tags: build_tag_list(label: 'build')
},
test: {
script: 'ls',
stage: 'test',
tags: build_tag_list(label: 'test')
}
}
end
let(:config_without_tags) do
config.transform_values { |job| job.except(:tags) }
end
context 'with multiple tags' do
context 'when the tags do not exist' do
it 'does not execute N+1 queries' do
stub_yaml_config(config_without_tags)
# warm up the cached objects so we get a more accurate count
create_pipeline
control = ActiveRecord::QueryRecorder.new(skip_cached: false) do
create_pipeline
end
stub_yaml_config(config)
# 2 select tags.*
# 1 insert tags
# 1 insert taggings
tags_queries_size = 4
expect { pipeline }
.not_to exceed_all_query_limit(control)
.with_threshold(tags_queries_size)
expect(pipeline).to be_created_successfully
end
end
context 'when the feature flag is disabled' do
before do
stub_feature_flags(ci_bulk_insert_tags: false)
end
it 'executes N+1s queries' do
stub_yaml_config(config_without_tags)
# warm up the cached objects so we get a more accurate count
create_pipeline
control = ActiveRecord::QueryRecorder.new(skip_cached: false) do
create_pipeline
end
stub_yaml_config(config)
expect { pipeline }
.to exceed_all_query_limit(control)
.with_threshold(4)
expect(pipeline).to be_created_successfully
end
end
context 'when tags are already persisted' do
it 'does not execute N+1 queries' do
# warm up the cached objects so we get a more accurate count
# and insert the tags
create_pipeline
control = ActiveRecord::QueryRecorder.new(skip_cached: false) do
create_pipeline
end
# 1 select tags.*
# 1 insert taggings
tags_queries_size = 2
expect { pipeline }
.not_to exceed_all_query_limit(control)
.with_threshold(tags_queries_size)
expect(pipeline).to be_created_successfully
end
end
end
context 'with bridge jobs' do
let(:config) do
{
test_1: {
script: 'ls',
stage: 'test',
tags: build_tag_list(label: 'test_1')
},
test_2: {
script: 'ls',
stage: 'test',
tags: build_tag_list(label: '$CI_JOB_NAME')
},
test_3: {
script: 'ls',
stage: 'test',
tags: build_tag_list(label: 'test_1') + build_tag_list(label: 'test_2')
},
test_4: {
script: 'ls',
stage: 'test'
},
deploy: {
stage: 'deploy',
trigger: 'my/project'
}
}
end
it do
expect(pipeline).to be_created_successfully
expect(pipeline.bridges.size).to eq(1)
expect(pipeline.builds.size).to eq(4)
expect(tags_for('test_1'))
.to have_attributes(count: 5)
.and all(match(/test_1-tag-\d+/))
expect(tags_for('test_2'))
.to have_attributes(count: 5)
.and all(match(/test_2-tag-\d+/))
expect(tags_for('test_3'))
.to have_attributes(count: 10)
.and all(match(/test_[1,2]-tag-\d+/))
expect(tags_for('test_4')).to be_empty
end
end
end
end
def tags_for(build_name)
pipeline.builds.find_by_name(build_name).tag_list
end
def stub_yaml_config(config)
stub_ci_pipeline_yaml_file(YAML.dump(config))
end
def create_pipeline
service.execute(:push).payload
end
def build_tag_list(label:, size: 5)
Array.new(size) { |index| "#{label}-tag-#{index}" }
end
end
......@@ -30,6 +30,7 @@
- "./spec/lib/gitlab/ci/pipeline/chain/create_spec.rb"
- "./spec/lib/gitlab/ci/pipeline/chain/seed_block_spec.rb"
- "./spec/lib/gitlab/ci/pipeline/seed/build_spec.rb"
- "./spec/lib/gitlab/ci/tags/bulk_insert_spec.rb"
- "./spec/lib/gitlab/ci/templates/5_minute_production_app_ci_yaml_spec.rb"
- "./spec/lib/gitlab/ci/templates/AWS/deploy_ecs_gitlab_ci_yaml_spec.rb"
- "./spec/lib/gitlab/ci/templates/Jobs/deploy_gitlab_ci_yaml_spec.rb"
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment