Commit 61be6b32 authored by Kamil Trzciński's avatar Kamil Trzciński

Merge branch '32258-sidekiq-report-actual-concurrency' into 'master'

More fine-grained metrics for sidekiq workers

Closes #32258

See merge request gitlab-org/gitlab!18817
parents a64354db 73b14405
...@@ -64,14 +64,16 @@ module Gitlab ...@@ -64,14 +64,16 @@ module Gitlab
# directory - The directory of the Rails application. # directory - The directory of the Rails application.
# #
# Returns an Array containing the PIDs of the started processes. # Returns an Array containing the PIDs of the started processes.
def self.start(queues, env, directory = Dir.pwd, max_concurrency = 50, dryrun: false) def self.start(queues, env: :development, directory: Dir.pwd, max_concurrency: 50, dryrun: false)
queues.map { |pair| start_sidekiq(pair, env, directory, max_concurrency, dryrun: dryrun) } queues.map.with_index do |pair, index|
start_sidekiq(pair, env: env, directory: directory, max_concurrency: max_concurrency, worker_id: index, dryrun: dryrun)
end
end end
# Starts a Sidekiq process that processes _only_ the given queues. # Starts a Sidekiq process that processes _only_ the given queues.
# #
# Returns the PID of the started process. # Returns the PID of the started process.
def self.start_sidekiq(queues, env, directory = Dir.pwd, max_concurrency = 50, dryrun: false) def self.start_sidekiq(queues, env:, directory:, max_concurrency:, worker_id:, dryrun:)
counts = count_by_queue(queues) counts = count_by_queue(queues)
cmd = %w[bundle exec sidekiq] cmd = %w[bundle exec sidekiq]
...@@ -90,7 +92,8 @@ module Gitlab ...@@ -90,7 +92,8 @@ module Gitlab
end end
pid = Process.spawn( pid = Process.spawn(
{ 'ENABLE_SIDEKIQ_CLUSTER' => '1' }, { 'ENABLE_SIDEKIQ_CLUSTER' => '1',
'SIDEKIQ_WORKER_ID' => worker_id.to_s },
*cmd, *cmd,
pgroup: true, pgroup: true,
err: $stderr, err: $stderr,
......
...@@ -49,7 +49,8 @@ module Gitlab ...@@ -49,7 +49,8 @@ module Gitlab
@logger.info("Starting cluster with #{queue_groups.length} processes") @logger.info("Starting cluster with #{queue_groups.length} processes")
@processes = SidekiqCluster.start(queue_groups, @environment, @rails_path, @max_concurrency, dryrun: @dryrun) @processes = SidekiqCluster.start(queue_groups, env: @environment, directory: @rails_path,
max_concurrency: @max_concurrency, dryrun: @dryrun)
return if @dryrun return if @dryrun
......
...@@ -4,6 +4,9 @@ require 'spec_helper' ...@@ -4,6 +4,9 @@ require 'spec_helper'
describe Gitlab::SidekiqCluster::CLI do describe Gitlab::SidekiqCluster::CLI do
let(:cli) { described_class.new('/dev/null') } let(:cli) { described_class.new('/dev/null') }
let(:default_options) do
{ env: 'test', directory: Dir.pwd, max_concurrency: 50, dryrun: false }
end
describe '#run' do describe '#run' do
context 'without any arguments' do context 'without any arguments' do
...@@ -21,7 +24,7 @@ describe Gitlab::SidekiqCluster::CLI do ...@@ -21,7 +24,7 @@ describe Gitlab::SidekiqCluster::CLI do
it 'starts the Sidekiq workers' do it 'starts the Sidekiq workers' do
expect(Gitlab::SidekiqCluster).to receive(:start) expect(Gitlab::SidekiqCluster).to receive(:start)
.with([['foo']], 'test', Dir.pwd, 50, dryrun: false) .with([['foo']], default_options)
.and_return([]) .and_return([])
cli.run(%w(foo)) cli.run(%w(foo))
...@@ -31,7 +34,7 @@ describe Gitlab::SidekiqCluster::CLI do ...@@ -31,7 +34,7 @@ describe Gitlab::SidekiqCluster::CLI do
it 'starts Sidekiq workers for all queues in all_queues.yml except the ones in argv' do it 'starts Sidekiq workers for all queues in all_queues.yml except the ones in argv' do
expect(Gitlab::SidekiqConfig).to receive(:worker_queues).and_return(['baz']) expect(Gitlab::SidekiqConfig).to receive(:worker_queues).and_return(['baz'])
expect(Gitlab::SidekiqCluster).to receive(:start) expect(Gitlab::SidekiqCluster).to receive(:start)
.with([['baz']], 'test', Dir.pwd, 50, dryrun: false) .with([['baz']], default_options)
.and_return([]) .and_return([])
cli.run(%w(foo -n)) cli.run(%w(foo -n))
...@@ -42,7 +45,7 @@ describe Gitlab::SidekiqCluster::CLI do ...@@ -42,7 +45,7 @@ describe Gitlab::SidekiqCluster::CLI do
it 'starts Sidekiq workers for specified queues with a max concurrency' do it 'starts Sidekiq workers for specified queues with a max concurrency' do
expect(Gitlab::SidekiqConfig).to receive(:worker_queues).and_return(%w(foo bar baz)) expect(Gitlab::SidekiqConfig).to receive(:worker_queues).and_return(%w(foo bar baz))
expect(Gitlab::SidekiqCluster).to receive(:start) expect(Gitlab::SidekiqCluster).to receive(:start)
.with([%w(foo bar baz), %w(solo)], 'test', Dir.pwd, 2, dryrun: false) .with([%w(foo bar baz), %w(solo)], default_options.merge(max_concurrency: 2))
.and_return([]) .and_return([])
cli.run(%w(foo,bar,baz solo -m 2)) cli.run(%w(foo,bar,baz solo -m 2))
...@@ -53,7 +56,7 @@ describe Gitlab::SidekiqCluster::CLI do ...@@ -53,7 +56,7 @@ describe Gitlab::SidekiqCluster::CLI do
it 'starts Sidekiq workers for all queues in all_queues.yml with a namespace in argv' do it 'starts Sidekiq workers for all queues in all_queues.yml with a namespace in argv' do
expect(Gitlab::SidekiqConfig).to receive(:worker_queues).and_return(['cronjob:foo', 'cronjob:bar']) expect(Gitlab::SidekiqConfig).to receive(:worker_queues).and_return(['cronjob:foo', 'cronjob:bar'])
expect(Gitlab::SidekiqCluster).to receive(:start) expect(Gitlab::SidekiqCluster).to receive(:start)
.with([['cronjob', 'cronjob:foo', 'cronjob:bar']], 'test', Dir.pwd, 50, dryrun: false) .with([['cronjob', 'cronjob:foo', 'cronjob:bar']], default_options)
.and_return([]) .and_return([])
cli.run(%w(cronjob)) cli.run(%w(cronjob))
......
...@@ -58,36 +58,44 @@ describe Gitlab::SidekiqCluster do ...@@ -58,36 +58,44 @@ describe Gitlab::SidekiqCluster do
end end
describe '.start' do describe '.start' do
it 'starts Sidekiq with the given queues and environment' do it 'starts Sidekiq with the given queues, environment and options' do
expect(described_class).to receive(:start_sidekiq) expected_options = { env: :production, directory: 'foo/bar', max_concurrency: 20, dryrun: true }
.ordered.with(%w(foo), :production, 'foo/bar', 50, dryrun: false)
expect(described_class).to receive(:start_sidekiq) expect(described_class).to receive(:start_sidekiq).ordered.with(%w(foo), expected_options.merge(worker_id: 0))
.ordered.with(%w(bar baz), :production, 'foo/bar', 50, dryrun: false) expect(described_class).to receive(:start_sidekiq).ordered.with(%w(bar baz), expected_options.merge(worker_id: 1))
described_class.start([%w(foo), %w(bar baz)], :production, 'foo/bar', 50) described_class.start([%w(foo), %w(bar baz)], env: :production, directory: 'foo/bar', max_concurrency: 20, dryrun: true)
end end
it 'starts Sidekiq with capped concurrency limits for each queue' do it 'starts Sidekiq with the given queues and sensible default options' do
expect(described_class).to receive(:start_sidekiq) expected_options = {
.ordered.with(%w(foo bar baz), :production, 'foo/bar', 2, dryrun: false) env: :development,
directory: an_instance_of(String),
max_concurrency: 50,
worker_id: an_instance_of(Integer),
dryrun: false
}
expect(described_class).to receive(:start_sidekiq) expect(described_class).to receive(:start_sidekiq).ordered.with(%w(foo bar baz), expected_options)
.ordered.with(%w(solo), :production, 'foo/bar', 2, dryrun: false) expect(described_class).to receive(:start_sidekiq).ordered.with(%w(solo), expected_options)
described_class.start([%w(foo bar baz), %w(solo)], :production, 'foo/bar', 2) described_class.start([%w(foo bar baz), %w(solo)])
end end
end end
describe '.start_sidekiq' do describe '.start_sidekiq' do
let(:env) { { "ENABLE_SIDEKIQ_CLUSTER" => "1" } } let(:first_worker_id) { 0 }
let(:options) do
{ env: :production, directory: 'foo/bar', max_concurrency: 20, worker_id: first_worker_id, dryrun: false }
end
let(:env) { { "ENABLE_SIDEKIQ_CLUSTER" => "1", "SIDEKIQ_WORKER_ID" => first_worker_id.to_s } }
let(:args) { ['bundle', 'exec', 'sidekiq', anything, '-eproduction', *([anything] * 5)] } let(:args) { ['bundle', 'exec', 'sidekiq', anything, '-eproduction', *([anything] * 5)] }
it 'starts a Sidekiq process' do it 'starts a Sidekiq process' do
allow(Process).to receive(:spawn).and_return(1) allow(Process).to receive(:spawn).and_return(1)
expect(described_class).to receive(:wait_async).with(1) expect(described_class).to receive(:wait_async).with(1)
expect(described_class.start_sidekiq(%w(foo), :production)).to eq(1) expect(described_class.start_sidekiq(%w(foo), options)).to eq(1)
end end
it 'handles duplicate queue names' do it 'handles duplicate queue names' do
...@@ -97,7 +105,7 @@ describe Gitlab::SidekiqCluster do ...@@ -97,7 +105,7 @@ describe Gitlab::SidekiqCluster do
.and_return(1) .and_return(1)
expect(described_class).to receive(:wait_async).with(1) expect(described_class).to receive(:wait_async).with(1)
expect(described_class.start_sidekiq(%w(foo foo bar baz), :production)).to eq(1) expect(described_class.start_sidekiq(%w(foo foo bar baz), options)).to eq(1)
end end
it 'runs the sidekiq process in a new process group' do it 'runs the sidekiq process in a new process group' do
...@@ -107,7 +115,7 @@ describe Gitlab::SidekiqCluster do ...@@ -107,7 +115,7 @@ describe Gitlab::SidekiqCluster do
.and_return(1) .and_return(1)
allow(described_class).to receive(:wait_async) allow(described_class).to receive(:wait_async)
expect(described_class.start_sidekiq(%w(foo bar baz), :production)).to eq(1) expect(described_class.start_sidekiq(%w(foo bar baz), options)).to eq(1)
end end
end end
......
...@@ -9,6 +9,8 @@ module Gitlab ...@@ -9,6 +9,8 @@ module Gitlab
def initialize def initialize
@metrics = init_metrics @metrics = init_metrics
@metrics[:sidekiq_concurrency].set({}, Sidekiq.options[:concurrency].to_i)
end end
def call(_worker, job, queue) def call(_worker, job, queue)
...@@ -45,7 +47,8 @@ module Gitlab ...@@ -45,7 +47,8 @@ module Gitlab
sidekiq_jobs_completion_seconds: ::Gitlab::Metrics.histogram(:sidekiq_jobs_completion_seconds, 'Seconds to complete sidekiq job', {}, SIDEKIQ_LATENCY_BUCKETS), sidekiq_jobs_completion_seconds: ::Gitlab::Metrics.histogram(:sidekiq_jobs_completion_seconds, 'Seconds to complete sidekiq job', {}, SIDEKIQ_LATENCY_BUCKETS),
sidekiq_jobs_failed_total: ::Gitlab::Metrics.counter(:sidekiq_jobs_failed_total, 'Sidekiq jobs failed'), sidekiq_jobs_failed_total: ::Gitlab::Metrics.counter(:sidekiq_jobs_failed_total, 'Sidekiq jobs failed'),
sidekiq_jobs_retried_total: ::Gitlab::Metrics.counter(:sidekiq_jobs_retried_total, 'Sidekiq jobs retried'), sidekiq_jobs_retried_total: ::Gitlab::Metrics.counter(:sidekiq_jobs_retried_total, 'Sidekiq jobs retried'),
sidekiq_running_jobs: ::Gitlab::Metrics.gauge(:sidekiq_running_jobs, 'Number of Sidekiq jobs running', {}, :livesum) sidekiq_running_jobs: ::Gitlab::Metrics.gauge(:sidekiq_running_jobs, 'Number of Sidekiq jobs running', {}, :all),
sidekiq_concurrency: ::Gitlab::Metrics.gauge(:sidekiq_concurrency, 'Maximum number of Sidekiq jobs', {}, :all)
} }
end end
......
...@@ -6,7 +6,7 @@ module Prometheus ...@@ -6,7 +6,7 @@ module Prometheus
def worker_id def worker_id
if Sidekiq.server? if Sidekiq.server?
'sidekiq' sidekiq_worker_id
elsif defined?(Unicorn::Worker) elsif defined?(Unicorn::Worker)
unicorn_worker_id unicorn_worker_id
elsif defined?(::Puma) elsif defined?(::Puma)
...@@ -18,6 +18,14 @@ module Prometheus ...@@ -18,6 +18,14 @@ module Prometheus
private private
def sidekiq_worker_id
if worker = ENV['SIDEKIQ_WORKER_ID']
"sidekiq_#{worker}"
else
'sidekiq'
end
end
def unicorn_worker_id def unicorn_worker_id
if matches = process_name.match(/unicorn.*worker\[([0-9]+)\]/) if matches = process_name.match(/unicorn.*worker\[([0-9]+)\]/)
"unicorn_#{matches[1]}" "unicorn_#{matches[1]}"
......
...@@ -3,25 +3,37 @@ ...@@ -3,25 +3,37 @@
require 'spec_helper' require 'spec_helper'
describe Gitlab::SidekiqMiddleware::Metrics do describe Gitlab::SidekiqMiddleware::Metrics do
describe '#call' do let(:middleware) { described_class.new }
let(:middleware) { described_class.new }
let(:worker) { double(:worker) } let(:concurrency_metric) { double('concurrency metric') }
let(:completion_seconds_metric) { double('completion seconds metric') }
let(:completion_seconds_metric) { double('completion seconds metric') } let(:user_execution_seconds_metric) { double('user execution seconds metric') }
let(:user_execution_seconds_metric) { double('user execution seconds metric') } let(:failed_total_metric) { double('failed total metric') }
let(:failed_total_metric) { double('failed total metric') } let(:retried_total_metric) { double('retried total metric') }
let(:retried_total_metric) { double('retried total metric') } let(:running_jobs_metric) { double('running jobs metric') }
let(:running_jobs_metric) { double('running jobs metric') }
before do
allow(Gitlab::Metrics).to receive(:histogram).with(:sidekiq_jobs_completion_seconds, anything, anything, anything).and_return(completion_seconds_metric)
allow(Gitlab::Metrics).to receive(:histogram).with(:sidekiq_jobs_cpu_seconds, anything, anything, anything).and_return(user_execution_seconds_metric)
allow(Gitlab::Metrics).to receive(:counter).with(:sidekiq_jobs_failed_total, anything).and_return(failed_total_metric)
allow(Gitlab::Metrics).to receive(:counter).with(:sidekiq_jobs_retried_total, anything).and_return(retried_total_metric)
allow(Gitlab::Metrics).to receive(:gauge).with(:sidekiq_running_jobs, anything, {}, :all).and_return(running_jobs_metric)
allow(Gitlab::Metrics).to receive(:gauge).with(:sidekiq_concurrency, anything, {}, :all).and_return(concurrency_metric)
allow(running_jobs_metric).to receive(:increment)
allow(concurrency_metric).to receive(:set)
end
before do describe '#initialize' do
allow(Gitlab::Metrics).to receive(:histogram).with(:sidekiq_jobs_completion_seconds, anything, anything, anything).and_return(completion_seconds_metric) it 'sets general metrics' do
allow(Gitlab::Metrics).to receive(:histogram).with(:sidekiq_jobs_cpu_seconds, anything, anything, anything).and_return(user_execution_seconds_metric) expect(concurrency_metric).to receive(:set).with({}, Sidekiq.options[:concurrency].to_i)
allow(Gitlab::Metrics).to receive(:counter).with(:sidekiq_jobs_failed_total, anything).and_return(failed_total_metric)
allow(Gitlab::Metrics).to receive(:counter).with(:sidekiq_jobs_retried_total, anything).and_return(retried_total_metric)
allow(Gitlab::Metrics).to receive(:gauge).with(:sidekiq_running_jobs, anything, {}, :livesum).and_return(running_jobs_metric)
allow(running_jobs_metric).to receive(:increment) middleware
end end
end
describe '#call' do
let(:worker) { double(:worker) }
it 'yields block' do it 'yields block' do
allow(completion_seconds_metric).to receive(:observe) allow(completion_seconds_metric).to receive(:observe)
...@@ -30,7 +42,7 @@ describe Gitlab::SidekiqMiddleware::Metrics do ...@@ -30,7 +42,7 @@ describe Gitlab::SidekiqMiddleware::Metrics do
expect { |b| middleware.call(worker, {}, :test, &b) }.to yield_control.once expect { |b| middleware.call(worker, {}, :test, &b) }.to yield_control.once
end end
it 'sets metrics' do it 'sets queue specific metrics' do
labels = { queue: :test } labels = { queue: :test }
allow(middleware).to receive(:get_thread_cputime).and_return(1, 3) allow(middleware).to receive(:get_thread_cputime).and_return(1, 3)
......
...@@ -18,7 +18,17 @@ describe Prometheus::PidProvider do ...@@ -18,7 +18,17 @@ describe Prometheus::PidProvider do
expect(Sidekiq).to receive(:server?).and_return(true) expect(Sidekiq).to receive(:server?).and_return(true)
end end
it { is_expected.to eq 'sidekiq' } context 'in a clustered setup' do
before do
stub_env('SIDEKIQ_WORKER_ID', '123')
end
it { is_expected.to eq 'sidekiq_123' }
end
context 'in a single process setup' do
it { is_expected.to eq 'sidekiq' }
end
end end
context 'when running in Unicorn mode' do context 'when running in Unicorn mode' do
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment