Commit 58fecbd3 authored by Heinrich Lee Yu's avatar Heinrich Lee Yu

Merge branch 'expire-job-and-pipeline-cache-synchronously' into 'master'

Expire job and pipeline cache synchronously

See merge request gitlab-org/gitlab!75611
parents de212bdf aba37a39
...@@ -236,9 +236,14 @@ module Ci ...@@ -236,9 +236,14 @@ module Ci
pipeline.run_after_commit do pipeline.run_after_commit do
PipelineHooksWorker.perform_async(pipeline.id) PipelineHooksWorker.perform_async(pipeline.id)
if Feature.enabled?(:expire_job_and_pipeline_cache_synchronously, pipeline.project, default_enabled: :yaml)
Ci::ExpirePipelineCacheService.new.execute(pipeline) # rubocop: disable CodeReuse/ServiceClass
else
ExpirePipelineCacheWorker.perform_async(pipeline.id) ExpirePipelineCacheWorker.perform_async(pipeline.id)
end end
end end
end
after_transition any => ::Ci::Pipeline.completed_statuses do |pipeline| after_transition any => ::Ci::Pipeline.completed_statuses do |pipeline|
pipeline.run_after_commit do pipeline.run_after_commit do
......
...@@ -188,9 +188,14 @@ class CommitStatus < Ci::ApplicationRecord ...@@ -188,9 +188,14 @@ class CommitStatus < Ci::ApplicationRecord
commit_status.run_after_commit do commit_status.run_after_commit do
PipelineProcessWorker.perform_async(pipeline_id) unless transition_options[:skip_pipeline_processing] PipelineProcessWorker.perform_async(pipeline_id) unless transition_options[:skip_pipeline_processing]
if Feature.enabled?(:expire_job_and_pipeline_cache_synchronously, project, default_enabled: :yaml)
expire_etag_cache!
else
ExpireJobCacheWorker.perform_async(id) ExpireJobCacheWorker.perform_async(id)
end end
end end
end
after_transition any => :failed do |commit_status| after_transition any => :failed do |commit_status|
commit_status.run_after_commit do commit_status.run_after_commit do
...@@ -301,6 +306,12 @@ class CommitStatus < Ci::ApplicationRecord ...@@ -301,6 +306,12 @@ class CommitStatus < Ci::ApplicationRecord
.update_all(retried: true, processed: true) .update_all(retried: true, processed: true)
end end
def expire_etag_cache!
job_path = Gitlab::Routing.url_helpers.project_build_path(project, id, format: :json)
Gitlab::EtagCaching::Store.new.touch(job_path)
end
private private
def unrecoverable_failure? def unrecoverable_failure?
......
...@@ -74,20 +74,25 @@ module Ci ...@@ -74,20 +74,25 @@ module Ci
def update_etag_cache(pipeline, store) def update_etag_cache(pipeline, store)
project = pipeline.project project = pipeline.project
store.touch(project_pipelines_path(project)) etag_paths = [
store.touch(commit_pipelines_path(project, pipeline.commit)) unless pipeline.commit.nil? project_pipelines_path(project),
store.touch(new_merge_request_pipelines_path(project)) new_merge_request_pipelines_path(project),
graphql_project_on_demand_scan_counts_path(project)
]
etag_paths << commit_pipelines_path(project, pipeline.commit) unless pipeline.commit.nil?
each_pipelines_merge_request_path(pipeline) do |path| each_pipelines_merge_request_path(pipeline) do |path|
store.touch(path) etag_paths << path
end end
pipeline.self_with_upstreams_and_downstreams.each do |relative_pipeline| pipeline.self_with_upstreams_and_downstreams.includes(project: [:route, { namespace: :route }]).each do |relative_pipeline| # rubocop: disable CodeReuse/ActiveRecord
store.touch(project_pipeline_path(relative_pipeline.project, relative_pipeline)) etag_paths << project_pipeline_path(relative_pipeline.project, relative_pipeline)
store.touch(graphql_pipeline_path(relative_pipeline)) etag_paths << graphql_pipeline_path(relative_pipeline)
store.touch(graphql_pipeline_sha_path(relative_pipeline.sha)) etag_paths << graphql_pipeline_sha_path(relative_pipeline.sha)
end end
store.touch(graphql_project_on_demand_scan_counts_path(project)) store.touch(*etag_paths)
end end
def url_helpers def url_helpers
......
...@@ -36,6 +36,10 @@ module Ci ...@@ -36,6 +36,10 @@ module Ci
update_pipeline! update_pipeline!
update_statuses_processed! update_statuses_processed!
if Feature.enabled?(:expire_job_and_pipeline_cache_synchronously, pipeline.project, default_enabled: :yaml)
Ci::ExpirePipelineCacheService.new.execute(pipeline)
end
true true
end end
......
...@@ -15,19 +15,10 @@ class ExpireJobCacheWorker # rubocop:disable Scalability/IdempotentWorker ...@@ -15,19 +15,10 @@ class ExpireJobCacheWorker # rubocop:disable Scalability/IdempotentWorker
idempotent! idempotent!
def perform(job_id) def perform(job_id)
job = CommitStatus.preload(:pipeline, :project).find_by_id(job_id) # rubocop: disable CodeReuse/ActiveRecord job = CommitStatus.find_by_id(job_id)
return unless job return unless job
pipeline = job.pipeline job.expire_etag_cache!
project = job.project ExpirePipelineCacheWorker.perform_async(job.pipeline_id)
Gitlab::EtagCaching::Store.new.touch(project_job_path(project, job))
ExpirePipelineCacheWorker.perform_async(pipeline.id)
end
private
def project_job_path(project, job)
Gitlab::Routing.url_helpers.project_build_path(project, job.id, format: :json)
end end
end end
---
name: expire_job_and_pipeline_cache_synchronously
introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/75611
rollout_issue_url: https://gitlab.com/gitlab-com/gl-infra/scalability/-/issues/1380
milestone: '14.6'
type: development
group: group::project management
default_enabled: false
...@@ -12,14 +12,18 @@ module Gitlab ...@@ -12,14 +12,18 @@ module Gitlab
Gitlab::Redis::SharedState.with { |redis| redis.get(redis_shared_state_key(key)) } Gitlab::Redis::SharedState.with { |redis| redis.get(redis_shared_state_key(key)) }
end end
def touch(key, only_if_missing: false) def touch(*keys, only_if_missing: false)
etag = generate_etag etags = keys.map { generate_etag }
Gitlab::Redis::SharedState.with do |redis| Gitlab::Redis::SharedState.with do |redis|
redis.set(redis_shared_state_key(key), etag, ex: EXPIRY_TIME, nx: only_if_missing) redis.pipelined do
keys.each_with_index do |key, i|
redis.set(redis_shared_state_key(key), etags[i], ex: EXPIRY_TIME, nx: only_if_missing)
end
end
end end
etag keys.size > 1 ? etags : etags.first
end end
private private
......
...@@ -80,5 +80,19 @@ RSpec.describe Gitlab::EtagCaching::Store, :clean_gitlab_redis_shared_state do ...@@ -80,5 +80,19 @@ RSpec.describe Gitlab::EtagCaching::Store, :clean_gitlab_redis_shared_state do
expect(store.get(key)).to eq(etag) expect(store.get(key)).to eq(etag)
end end
end end
context 'with multiple keys' do
let(:keys) { ['/my-group/my-project/builds/234.json', '/api/graphql:pipelines/id/5'] }
it 'stores and returns multiple values' do
etags = store.touch(*keys)
expect(etags.size).to eq(keys.size)
keys.each_with_index do |key, i|
expect(store.get(key)).to eq(etags[i])
end
end
end
end end
end end
...@@ -1503,12 +1503,32 @@ RSpec.describe Ci::Pipeline, :mailer, factory_default: :keep do ...@@ -1503,12 +1503,32 @@ RSpec.describe Ci::Pipeline, :mailer, factory_default: :keep do
end end
describe 'pipeline caching' do describe 'pipeline caching' do
context 'when expire_job_and_pipeline_cache_synchronously is enabled' do
before do
stub_feature_flags(expire_job_and_pipeline_cache_synchronously: true)
end
it 'executes Ci::ExpirePipelineCacheService' do
expect_next_instance_of(Ci::ExpirePipelineCacheService) do |service|
expect(service).to receive(:execute).with(pipeline)
end
pipeline.cancel
end
end
context 'when expire_job_and_pipeline_cache_synchronously is disabled' do
before do
stub_feature_flags(expire_job_and_pipeline_cache_synchronously: false)
end
it 'performs ExpirePipelinesCacheWorker' do it 'performs ExpirePipelinesCacheWorker' do
expect(ExpirePipelineCacheWorker).to receive(:perform_async).with(pipeline.id) expect(ExpirePipelineCacheWorker).to receive(:perform_async).with(pipeline.id)
pipeline.cancel pipeline.cancel
end end
end end
end
describe '#dangling?' do describe '#dangling?' do
it 'returns true if pipeline comes from any dangling sources' do it 'returns true if pipeline comes from any dangling sources' do
......
...@@ -46,11 +46,29 @@ RSpec.describe CommitStatus do ...@@ -46,11 +46,29 @@ RSpec.describe CommitStatus do
describe 'status state machine' do describe 'status state machine' do
let!(:commit_status) { create(:commit_status, :running, project: project) } let!(:commit_status) { create(:commit_status, :running, project: project) }
context 'when expire_job_and_pipeline_cache_synchronously is enabled' do
before do
stub_feature_flags(expire_job_and_pipeline_cache_synchronously: true)
end
it 'invalidates the cache after a transition' do
expect(commit_status).to receive(:expire_etag_cache!)
commit_status.success!
end
end
context 'when expire_job_and_pipeline_cache_synchronously is disabled' do
before do
stub_feature_flags(expire_job_and_pipeline_cache_synchronously: false)
end
it 'invalidates the cache after a transition' do it 'invalidates the cache after a transition' do
expect(ExpireJobCacheWorker).to receive(:perform_async).with(commit_status.id) expect(ExpireJobCacheWorker).to receive(:perform_async).with(commit_status.id)
commit_status.success! commit_status.success!
end end
end
describe 'transitioning to running' do describe 'transitioning to running' do
let(:commit_status) { create(:commit_status, :pending, started_at: nil) } let(:commit_status) { create(:commit_status, :pending, started_at: nil) }
...@@ -949,4 +967,15 @@ RSpec.describe CommitStatus do ...@@ -949,4 +967,15 @@ RSpec.describe CommitStatus do
described_class.bulk_insert_tags!(statuses, tag_list_by_build) described_class.bulk_insert_tags!(statuses, tag_list_by_build)
end end
end end
describe '#expire_etag_cache!' do
it 'expires the etag cache' do
expect_next_instance_of(Gitlab::EtagCaching::Store) do |etag_store|
job_path = Gitlab::Routing.url_helpers.project_build_path(project, commit_status.id, format: :json)
expect(etag_store).to receive(:touch).with(job_path)
end
commit_status.expire_etag_cache!
end
end
end end
...@@ -18,14 +18,14 @@ RSpec.describe Ci::ExpirePipelineCacheService do ...@@ -18,14 +18,14 @@ RSpec.describe Ci::ExpirePipelineCacheService do
graphql_pipeline_sha_path = "/api/graphql:pipelines/sha/#{pipeline.sha}" graphql_pipeline_sha_path = "/api/graphql:pipelines/sha/#{pipeline.sha}"
graphql_project_on_demand_scan_counts_path = "/api/graphql:on_demand_scan/counts/#{project.full_path}" graphql_project_on_demand_scan_counts_path = "/api/graphql:on_demand_scan/counts/#{project.full_path}"
expect_next_instance_of(Gitlab::EtagCaching::Store) do |store| expect_touched_etag_caching_paths(
expect(store).to receive(:touch).with(pipelines_path) pipelines_path,
expect(store).to receive(:touch).with(new_mr_pipelines_path) new_mr_pipelines_path,
expect(store).to receive(:touch).with(pipeline_path) pipeline_path,
expect(store).to receive(:touch).with(graphql_pipeline_path) graphql_pipeline_path,
expect(store).to receive(:touch).with(graphql_pipeline_sha_path) graphql_pipeline_sha_path,
expect(store).to receive(:touch).with(graphql_project_on_demand_scan_counts_path) graphql_project_on_demand_scan_counts_path
end )
subject.execute(pipeline) subject.execute(pipeline)
end end
...@@ -37,9 +37,10 @@ RSpec.describe Ci::ExpirePipelineCacheService do ...@@ -37,9 +37,10 @@ RSpec.describe Ci::ExpirePipelineCacheService do
merge_request_pipelines_path = "/#{project.full_path}/-/merge_requests/#{merge_request.iid}/pipelines.json" merge_request_pipelines_path = "/#{project.full_path}/-/merge_requests/#{merge_request.iid}/pipelines.json"
merge_request_widget_path = "/#{project.full_path}/-/merge_requests/#{merge_request.iid}/cached_widget.json" merge_request_widget_path = "/#{project.full_path}/-/merge_requests/#{merge_request.iid}/cached_widget.json"
allow_any_instance_of(Gitlab::EtagCaching::Store).to receive(:touch) expect_touched_etag_caching_paths(
expect_any_instance_of(Gitlab::EtagCaching::Store).to receive(:touch).with(merge_request_pipelines_path) merge_request_pipelines_path,
expect_any_instance_of(Gitlab::EtagCaching::Store).to receive(:touch).with(merge_request_widget_path) merge_request_widget_path
)
subject.execute(merge_request.all_pipelines.last) subject.execute(merge_request.all_pipelines.last)
end end
...@@ -78,10 +79,7 @@ RSpec.describe Ci::ExpirePipelineCacheService do ...@@ -78,10 +79,7 @@ RSpec.describe Ci::ExpirePipelineCacheService do
it 'updates the cache of dependent pipeline' do it 'updates the cache of dependent pipeline' do
dependent_pipeline_path = "/#{source.source_project.full_path}/-/pipelines/#{source.source_pipeline.id}.json" dependent_pipeline_path = "/#{source.source_project.full_path}/-/pipelines/#{source.source_pipeline.id}.json"
expect_next_instance_of(Gitlab::EtagCaching::Store) do |store| expect_touched_etag_caching_paths(dependent_pipeline_path)
allow(store).to receive(:touch)
expect(store).to receive(:touch).with(dependent_pipeline_path)
end
subject.execute(pipeline) subject.execute(pipeline)
end end
...@@ -94,12 +92,30 @@ RSpec.describe Ci::ExpirePipelineCacheService do ...@@ -94,12 +92,30 @@ RSpec.describe Ci::ExpirePipelineCacheService do
it 'updates the cache of dependent pipeline' do it 'updates the cache of dependent pipeline' do
dependent_pipeline_path = "/#{source.project.full_path}/-/pipelines/#{source.pipeline.id}.json" dependent_pipeline_path = "/#{source.project.full_path}/-/pipelines/#{source.pipeline.id}.json"
expect_next_instance_of(Gitlab::EtagCaching::Store) do |store| expect_touched_etag_caching_paths(dependent_pipeline_path)
allow(store).to receive(:touch)
expect(store).to receive(:touch).with(dependent_pipeline_path) subject.execute(pipeline)
end
end end
it 'does not do N+1 queries' do
subject.execute(pipeline) subject.execute(pipeline)
control = ActiveRecord::QueryRecorder.new { subject.execute(pipeline) }
create(:ci_sources_pipeline, pipeline: pipeline)
create(:ci_sources_pipeline, source_job: create(:ci_build, pipeline: pipeline))
expect { subject.execute(pipeline) }.not_to exceed_query_limit(control.count)
end
end
def expect_touched_etag_caching_paths(*paths)
expect_next_instance_of(Gitlab::EtagCaching::Store) do |store|
expect(store).to receive(:touch).and_wrap_original do |m, *args|
expect(args).to include(*paths)
m.call(*args)
end end
end end
end end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment