Commit d1eda393 authored by Grzegorz Bizon's avatar Grzegorz Bizon

Merge branch 'backstage/gb/rename-ci-cd-processing-sidekiq-queues' into 'master'

Rename CI/CD related Sidekiq queues

Closes #35532

See merge request !13714
parents 922eb6d3 15ace6a9
class BuildCoverageWorker
include Sidekiq::Worker
include BuildQueue
include PipelineQueue
def perform(build_id)
Ci::Build.find_by(id: build_id)&.update_coverage
......
class BuildFinishedWorker
include Sidekiq::Worker
include BuildQueue
include PipelineQueue
enqueue_in group: :processing
def perform(build_id)
Ci::Build.find_by(id: build_id).try do |build|
......
class BuildHooksWorker
include Sidekiq::Worker
include BuildQueue
include PipelineQueue
enqueue_in group: :hooks
def perform(build_id)
Ci::Build.find_by(id: build_id)
......
class BuildQueueWorker
include Sidekiq::Worker
include BuildQueue
include PipelineQueue
enqueue_in group: :processing
def perform(build_id)
Ci::Build.find_by(id: build_id).try do |build|
......
class BuildSuccessWorker
include Sidekiq::Worker
include BuildQueue
include PipelineQueue
enqueue_in group: :processing
def perform(build_id)
Ci::Build.find_by(id: build_id).try do |build|
......
# Concern for setting Sidekiq settings for the various CI build workers.
module BuildQueue
extend ActiveSupport::Concern
included do
sidekiq_options queue: :build
end
end
##
# Concern for setting Sidekiq settings for the various CI pipeline workers.
#
module PipelineQueue
extend ActiveSupport::Concern
included do
sidekiq_options queue: :pipeline
sidekiq_options queue: 'pipeline_default'
end
class_methods do
def enqueue_in(group:)
raise ArgumentError, 'Unspecified queue group!' if group.empty?
sidekiq_options queue: "pipeline_#{group}"
end
end
end
class ExpireJobCacheWorker
include Sidekiq::Worker
include BuildQueue
include PipelineQueue
enqueue_in group: :cache
def perform(job_id)
job = CommitStatus.joins(:pipeline, :project).find_by(id: job_id)
......
......@@ -2,6 +2,8 @@ class ExpirePipelineCacheWorker
include Sidekiq::Worker
include PipelineQueue
enqueue_in group: :cache
def perform(pipeline_id)
pipeline = Ci::Pipeline.find_by(id: pipeline_id)
return unless pipeline
......
......@@ -2,6 +2,8 @@ class PipelineHooksWorker
include Sidekiq::Worker
include PipelineQueue
enqueue_in group: :hooks
def perform(pipeline_id)
Ci::Pipeline.find_by(id: pipeline_id)
.try(:execute_hooks)
......
......@@ -2,6 +2,8 @@ class PipelineProcessWorker
include Sidekiq::Worker
include PipelineQueue
enqueue_in group: :processing
def perform(pipeline_id)
Ci::Pipeline.find_by(id: pipeline_id)
.try(:process!)
......
......@@ -2,6 +2,8 @@ class PipelineSuccessWorker
include Sidekiq::Worker
include PipelineQueue
enqueue_in group: :processing
def perform(pipeline_id)
Ci::Pipeline.find_by(id: pipeline_id).try do |pipeline|
MergeRequests::MergeWhenPipelineSucceedsService
......
......@@ -2,6 +2,8 @@ class PipelineUpdateWorker
include Sidekiq::Worker
include PipelineQueue
enqueue_in group: :processing
def perform(pipeline_id)
Ci::Pipeline.find_by(id: pipeline_id)
.try(:update_status)
......
......@@ -2,6 +2,8 @@ class StageUpdateWorker
include Sidekiq::Worker
include PipelineQueue
enqueue_in group: :processing
def perform(stage_id)
Ci::Stage.find_by(id: stage_id).try do |stage|
stage.update_status
......
......@@ -27,6 +27,10 @@
- [new_merge_request, 2]
- [build, 2]
- [pipeline, 2]
- [pipeline_processing, 5]
- [pipeline_default, 3]
- [pipeline_cache, 3]
- [pipeline_hooks, 2]
- [gitlab_shell, 2]
- [email_receiver, 2]
- [emails_on_push, 2]
......
class MigratePipelineSidekiqQueues < ActiveRecord::Migration
include Gitlab::Database::MigrationHelpers
DOWNTIME = false
def up
sidekiq_queue_migrate 'build', to: 'pipeline_default'
sidekiq_queue_migrate 'pipeline', to: 'pipeline_default'
end
def down
sidekiq_queue_migrate 'pipeline_default', to: 'pipeline'
sidekiq_queue_migrate 'pipeline_processing', to: 'pipeline'
sidekiq_queue_migrate 'pipeline_hooks', to: 'pipeline'
sidekiq_queue_migrate 'pipeline_cache', to: 'pipeline'
end
end
......@@ -611,6 +611,20 @@ module Gitlab
remove_foreign_key(*args)
rescue ArgumentError
end
def sidekiq_queue_migrate(queue_from, to:)
while sidekiq_queue_length(queue_from) > 0
Sidekiq.redis do |conn|
conn.rpoplpush "queue:#{queue_from}", "queue:#{to}"
end
end
end
def sidekiq_queue_length(queue_name)
Sidekiq.redis do |conn|
conn.llen("queue:#{queue_name}")
end
end
end
end
end
......@@ -2,9 +2,7 @@ require 'spec_helper'
describe Gitlab::Database::MigrationHelpers do
let(:model) do
ActiveRecord::Migration.new.extend(
described_class
)
ActiveRecord::Migration.new.extend(described_class)
end
before do
......@@ -845,4 +843,51 @@ describe Gitlab::Database::MigrationHelpers do
end
end
end
describe 'sidekiq migration helpers', :sidekiq, :redis do
let(:worker) do
Class.new do
include Sidekiq::Worker
sidekiq_options queue: 'test'
end
end
describe '#sidekiq_queue_length' do
context 'when queue is empty' do
it 'returns zero' do
Sidekiq::Testing.disable! do
expect(model.sidekiq_queue_length('test')).to eq 0
end
end
end
context 'when queue contains jobs' do
it 'returns correct size of the queue' do
Sidekiq::Testing.disable! do
worker.perform_async('Something', [1])
worker.perform_async('Something', [2])
expect(model.sidekiq_queue_length('test')).to eq 2
end
end
end
end
describe '#migrate_sidekiq_queue' do
it 'migrates jobs from one sidekiq queue to another' do
Sidekiq::Testing.disable! do
worker.perform_async('Something', [1])
worker.perform_async('Something', [2])
expect(model.sidekiq_queue_length('test')).to eq 2
expect(model.sidekiq_queue_length('new_test')).to eq 0
model.sidekiq_queue_migrate('test', to: 'new_test')
expect(model.sidekiq_queue_length('test')).to eq 0
expect(model.sidekiq_queue_length('new_test')).to eq 2
end
end
end
end
end
require 'spec_helper'
require Rails.root.join('db', 'post_migrate', '20170822101017_migrate_pipeline_sidekiq_queues.rb')
describe MigratePipelineSidekiqQueues, :sidekiq, :redis do
include Gitlab::Database::MigrationHelpers
context 'when there are jobs in the queues' do
it 'correctly migrates queue when migrating up' do
Sidekiq::Testing.disable! do
stubbed_worker(queue: :pipeline).perform_async('Something', [1])
stubbed_worker(queue: :build).perform_async('Something', [1])
described_class.new.up
expect(sidekiq_queue_length('pipeline')).to eq 0
expect(sidekiq_queue_length('build')).to eq 0
expect(sidekiq_queue_length('pipeline_default')).to eq 2
end
end
it 'correctly migrates queue when migrating down' do
Sidekiq::Testing.disable! do
stubbed_worker(queue: :pipeline_default).perform_async('Class', [1])
stubbed_worker(queue: :pipeline_processing).perform_async('Class', [2])
stubbed_worker(queue: :pipeline_hooks).perform_async('Class', [3])
stubbed_worker(queue: :pipeline_cache).perform_async('Class', [4])
described_class.new.down
expect(sidekiq_queue_length('pipeline')).to eq 4
expect(sidekiq_queue_length('pipeline_default')).to eq 0
expect(sidekiq_queue_length('pipeline_processing')).to eq 0
expect(sidekiq_queue_length('pipeline_hooks')).to eq 0
expect(sidekiq_queue_length('pipeline_cache')).to eq 0
end
end
end
context 'when there are no jobs in the queues' do
it 'does not raise error when migrating up' do
expect { described_class.new.up }.not_to raise_error
end
it 'does not raise error when migrating down' do
expect { described_class.new.down }.not_to raise_error
end
end
def stubbed_worker(queue:)
Class.new do
include Sidekiq::Worker
sidekiq_options queue: queue
end
end
end
require 'spec_helper'
describe BuildQueue do
let(:worker) do
Class.new do
include Sidekiq::Worker
include BuildQueue
end
end
it 'sets the queue name of a worker' do
expect(worker.sidekiq_options['queue'].to_s).to eq('build')
end
end
......@@ -8,7 +8,17 @@ describe PipelineQueue do
end
end
it 'sets the queue name of a worker' do
expect(worker.sidekiq_options['queue'].to_s).to eq('pipeline')
it 'sets a default pipelines queue automatically' do
expect(worker.sidekiq_options['queue'])
.to eq 'pipeline_default'
end
describe '.enqueue_in' do
it 'sets a custom sidekiq queue with prefix and group' do
worker.enqueue_in(group: :processing)
expect(worker.sidekiq_options['queue'])
.to eq 'pipeline_processing'
end
end
end
......@@ -2,7 +2,12 @@ require 'spec_helper'
describe PipelineMetricsWorker do
let(:project) { create(:project, :repository) }
let!(:merge_request) { create(:merge_request, source_project: project, source_branch: pipeline.ref, head_pipeline: pipeline) }
let!(:merge_request) do
create(:merge_request, source_project: project,
source_branch: pipeline.ref,
head_pipeline: pipeline)
end
let(:pipeline) do
create(:ci_empty_pipeline,
......@@ -14,6 +19,8 @@ describe PipelineMetricsWorker do
finished_at: Time.now)
end
let(:status) { 'pending' }
describe '#perform' do
before do
described_class.new.perform(pipeline.id)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment