Commit 926bb603 authored by Nikola Milojevic's avatar Nikola Milojevic Committed by Kamil Trzciński

Add load balancing Prometheus metrics in server_metrics middleware

parent 66ce587c
...@@ -250,10 +250,11 @@ configuration option in `gitlab.yml`. These metrics are served from the ...@@ -250,10 +250,11 @@ configuration option in `gitlab.yml`. These metrics are served from the
The following metrics are available: The following metrics are available:
| Metric | Type | Since | Description | | Metric | Type | Since | Description | Labels |
|:--------------------------------- |:--------- |:------------------------------------------------------------- |:-------------------------------------- | |:--------------------------------- |:--------- |:------------------------------------------------------------- |:-------------------------------------- |:--------------------------------------------------------- |
| `db_load_balancing_hosts` | Gauge | [12.3](https://gitlab.com/gitlab-org/gitlab/-/issues/13630) | Current number of load balancing hosts | | `db_load_balancing_hosts` | Gauge | [12.3](https://gitlab.com/gitlab-org/gitlab/-/issues/13630) | Current number of load balancing hosts | |
| `sidekiq_load_balancing_count` | Counter | 13.11 | Sidekiq jobs using load balancing with data consistency set to :sticky or :delayed | `queue`, `boundary`, `external_dependencies`, `feature_category`, `job_status`, `urgency`, `data_consistency`, `database_chosen` |
## Database partitioning metrics **(PREMIUM SELF)** ## Database partitioning metrics **(PREMIUM SELF)**
The following metrics are available: The following metrics are available:
......
---
title: Add load balancing Sidekiq metrics
merge_request: 58473
author:
type: other
# frozen_string_literal: true
module EE
module Gitlab
module SidekiqMiddleware
module ServerMetrics
extend ::Gitlab::Utils::Override
protected
override :init_metrics
def init_metrics
super.merge(init_load_balancing_metrics)
end
override :instrument
def instrument(job, labels)
super
ensure
record_load_balancing(job, labels)
end
private
def init_load_balancing_metrics
return {} unless ::Gitlab::Database::LoadBalancing.enable?
{
sidekiq_load_balancing_count: ::Gitlab::Metrics.counter(:sidekiq_load_balancing_count, 'Sidekiq jobs with load balancing')
}
end
def record_load_balancing(job, labels)
return unless ::Gitlab::Database::LoadBalancing.enable?
return unless job[:database_chosen]
load_balancing_labels = {
database_chosen: job[:database_chosen],
data_consistency: job[:data_consistency]
}
metrics[:sidekiq_load_balancing_count].increment(labels.merge(load_balancing_labels), 1)
end
end
end
end
end
...@@ -19,6 +19,9 @@ module Gitlab ...@@ -19,6 +19,9 @@ module Gitlab
return unless worker_class return unless worker_class
return unless worker_class.include?(::ApplicationWorker) return unless worker_class.include?(::ApplicationWorker)
return unless worker_class.get_data_consistency_feature_flag_enabled? return unless worker_class.get_data_consistency_feature_flag_enabled?
job['worker_data_consistency'] = worker_class.get_data_consistency
return if worker_class.get_data_consistency == :always return if worker_class.get_data_consistency == :always
if Session.current.performed_write? if Session.current.performed_write?
......
...@@ -25,18 +25,18 @@ module Gitlab ...@@ -25,18 +25,18 @@ module Gitlab
def requires_primary?(worker_class, job) def requires_primary?(worker_class, job)
return true unless worker_class.include?(::ApplicationWorker) return true unless worker_class.include?(::ApplicationWorker)
job[:worker_data_consistency] = worker_class.get_data_consistency
return true if worker_class.get_data_consistency == :always return true if worker_class.get_data_consistency == :always
return true unless worker_class.get_data_consistency_feature_flag_enabled? return true unless worker_class.get_data_consistency_feature_flag_enabled?
if job['database_replica_location'] || replica_caught_up?(job['database_write_location'] ) if job['database_replica_location'] || replica_caught_up?(job['database_write_location'])
job[:database_chosen] = 'replica'
false false
elsif worker_class.get_data_consistency == :delayed && job['retry_count'].to_i == 0 elsif worker_class.get_data_consistency == :delayed && job['retry_count'].to_i == 0
job[:database_chosen] = 'retry'
raise JobReplicaNotUpToDate, "Sidekiq job #{worker_class} JID-#{job['jid']} couldn't use the replica."\ raise JobReplicaNotUpToDate, "Sidekiq job #{worker_class} JID-#{job['jid']} couldn't use the replica."\
" Replica was not up to date." " Replica was not up to date."
else else
job[:database_chosen] = 'primary'
true true
end end
end end
......
# frozen_string_literal: true
require 'spec_helper'
# rubocop: disable RSpec/MultipleMemoizedHelpers
RSpec.describe Gitlab::SidekiqMiddleware::ServerMetrics do
using RSpec::Parameterized::TableSyntax
subject { described_class.new }
let(:queue) { :test }
let(:worker_class) { worker.class }
let(:job) { {} }
let(:job_status) { :done }
let(:labels_with_job_status) { default_labels.merge(job_status: job_status.to_s) }
let(:default_labels) do
{ queue: queue.to_s,
worker: worker_class.to_s,
boundary: "",
external_dependencies: "no",
feature_category: "",
urgency: "low" }
end
before do
stub_const('TestWorker', Class.new)
TestWorker.class_eval do
include Sidekiq::Worker
include WorkerAttributes
end
end
let(:worker) { TestWorker.new }
include_context 'server metrics with mocked prometheus'
context 'when load_balancing is enabled' do
let(:load_balancing_metric) { double('load balancing metric') }
include_context 'clear DB Load Balancing configuration'
before do
allow(::Gitlab::Database::LoadBalancing).to receive(:enable?).and_return(true)
allow(Gitlab::Metrics).to receive(:counter).with(:sidekiq_load_balancing_count, anything).and_return(load_balancing_metric)
end
describe '#initialize' do
it 'sets load_balancing metrics' do
expect(Gitlab::Metrics).to receive(:counter).with(:sidekiq_load_balancing_count, anything).and_return(load_balancing_metric)
subject
end
end
describe '#call' do
include_context 'server metrics call'
context 'when :database_chosen is provided' do
where(:database_chosen) do
%w[primary retry replica]
end
with_them do
context "when #{params[:database_chosen]} is used" do
let(:labels_with_load_balancing) do
labels_with_job_status.merge(database_chosen: database_chosen, data_consistency: 'delayed')
end
before do
job[:database_chosen] = database_chosen
job[:data_consistency] = 'delayed'
allow(load_balancing_metric).to receive(:increment)
end
it 'increment sidekiq_load_balancing_count' do
expect(load_balancing_metric).to receive(:increment).with(labels_with_load_balancing, 1)
described_class.new.call(worker, job, :test) { nil }
end
end
end
end
context 'when :database_chosen is not provided' do
it 'does not increment sidekiq_load_balancing_count' do
expect(load_balancing_metric).not_to receive(:increment)
described_class.new.call(worker, job, :test) { nil }
end
end
end
end
context 'when load_balancing is disabled' do
include_context 'clear DB Load Balancing configuration'
before do
allow(::Gitlab::Database::LoadBalancing).to receive(:enable?).and_return(false)
end
describe '#initialize' do
it 'doesnt set load_balancing metrics' do
expect(Gitlab::Metrics).not_to receive(:counter).with(:sidekiq_load_balancing_count, anything)
subject
end
end
end
end
...@@ -13,7 +13,6 @@ module Gitlab ...@@ -13,7 +13,6 @@ module Gitlab
:elasticsearch_calls, :elasticsearch_calls,
:elasticsearch_duration_s, :elasticsearch_duration_s,
:elasticsearch_timed_out_count, :elasticsearch_timed_out_count,
:worker_data_consistency,
*::Gitlab::Memory::Instrumentation::KEY_MAPPING.values, *::Gitlab::Memory::Instrumentation::KEY_MAPPING.values,
*::Gitlab::Instrumentation::Redis.known_payload_keys, *::Gitlab::Instrumentation::Redis.known_payload_keys,
*::Gitlab::Metrics::Subscribers::ActiveRecord.known_payload_keys, *::Gitlab::Metrics::Subscribers::ActiveRecord.known_payload_keys,
......
...@@ -21,6 +21,16 @@ module Gitlab ...@@ -21,6 +21,16 @@ module Gitlab
Thread.current.name ||= Gitlab::Metrics::Samplers::ThreadsSampler::SIDEKIQ_WORKER_THREAD_NAME Thread.current.name ||= Gitlab::Metrics::Samplers::ThreadsSampler::SIDEKIQ_WORKER_THREAD_NAME
labels = create_labels(worker.class, queue, job) labels = create_labels(worker.class, queue, job)
instrument(job, labels) do
yield
end
end
protected
attr_reader :metrics
def instrument(job, labels)
queue_duration = ::Gitlab::InstrumentationHelper.queue_duration_for_job(job) queue_duration = ::Gitlab::InstrumentationHelper.queue_duration_for_job(job)
@metrics[:sidekiq_jobs_queue_duration_seconds].observe(labels, queue_duration) if queue_duration @metrics[:sidekiq_jobs_queue_duration_seconds].observe(labels, queue_duration) if queue_duration
...@@ -62,8 +72,6 @@ module Gitlab ...@@ -62,8 +72,6 @@ module Gitlab
end end
end end
private
def init_metrics def init_metrics
{ {
sidekiq_jobs_cpu_seconds: ::Gitlab::Metrics.histogram(:sidekiq_jobs_cpu_seconds, 'Seconds of cpu time to run Sidekiq job', {}, SIDEKIQ_LATENCY_BUCKETS), sidekiq_jobs_cpu_seconds: ::Gitlab::Metrics.histogram(:sidekiq_jobs_cpu_seconds, 'Seconds of cpu time to run Sidekiq job', {}, SIDEKIQ_LATENCY_BUCKETS),
...@@ -82,6 +90,8 @@ module Gitlab ...@@ -82,6 +90,8 @@ module Gitlab
} }
end end
private
def get_thread_cputime def get_thread_cputime
defined?(Process::CLOCK_THREAD_CPUTIME_ID) ? Process.clock_gettime(Process::CLOCK_THREAD_CPUTIME_ID) : 0 defined?(Process::CLOCK_THREAD_CPUTIME_ID) ? Process.clock_gettime(Process::CLOCK_THREAD_CPUTIME_ID) : 0
end end
...@@ -108,3 +118,5 @@ module Gitlab ...@@ -108,3 +118,5 @@ module Gitlab
end end
end end
end end
Gitlab::SidekiqMiddleware::ServerMetrics.prepend_if_ee('EE::Gitlab::SidekiqMiddleware::ServerMetrics')
...@@ -3,156 +3,33 @@ ...@@ -3,156 +3,33 @@
require 'spec_helper' require 'spec_helper'
RSpec.describe Gitlab::SidekiqMiddleware::ClientMetrics do RSpec.describe Gitlab::SidekiqMiddleware::ClientMetrics do
context "with worker attribution" do shared_examples "a metrics middleware" do
subject { described_class.new } context "with mocked prometheus" do
let(:enqueued_jobs_metric) { double('enqueued jobs metric', increment: true) }
let(:queue) { :test }
let(:worker_class) { worker.class }
let(:job) { {} }
let(:default_labels) do
{ queue: queue.to_s,
worker: worker_class.to_s,
boundary: "",
external_dependencies: "no",
feature_category: "",
urgency: "low" }
end
shared_examples "a metrics client middleware" do
context "with mocked prometheus" do
let(:enqueued_jobs_metric) { double('enqueued jobs metric', increment: true) }
before do
allow(Gitlab::Metrics).to receive(:counter).with(described_class::ENQUEUED, anything).and_return(enqueued_jobs_metric)
end
describe '#call' do
it 'yields block' do
expect { |b| subject.call(worker_class, job, :test, double, &b) }.to yield_control.once
end
it 'increments enqueued jobs metric with correct labels when worker is a string of the class' do
expect(enqueued_jobs_metric).to receive(:increment).with(labels, 1)
subject.call(worker_class.to_s, job, :test, double) { nil }
end
it 'increments enqueued jobs metric with correct labels' do
expect(enqueued_jobs_metric).to receive(:increment).with(labels, 1)
subject.call(worker_class, job, :test, double) { nil }
end
end
end
end
context "when workers are not attributed" do
before do before do
stub_const('TestNonAttributedWorker', Class.new) allow(Gitlab::Metrics).to receive(:counter).with(described_class::ENQUEUED, anything).and_return(enqueued_jobs_metric)
TestNonAttributedWorker.class_eval do
include Sidekiq::Worker
end
end
it_behaves_like "a metrics client middleware" do
let(:worker) { TestNonAttributedWorker.new }
let(:labels) { default_labels.merge(urgency: "") }
end
end
context "when a worker is wrapped into ActiveJob" do
before do
stub_const('TestWrappedWorker', Class.new)
TestWrappedWorker.class_eval do
include Sidekiq::Worker
end
end
it_behaves_like "a metrics client middleware" do
let(:job) do
{
"class" => ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper,
"wrapped" => TestWrappedWorker
}
end
let(:worker) { TestWrappedWorker.new }
let(:labels) { default_labels.merge(urgency: "") }
end
end
context "when workers are attributed" do
def create_attributed_worker_class(urgency, external_dependencies, resource_boundary, category)
klass = Class.new do
include Sidekiq::Worker
include WorkerAttributes
urgency urgency if urgency
worker_has_external_dependencies! if external_dependencies
worker_resource_boundary resource_boundary unless resource_boundary == :unknown
feature_category category unless category.nil?
end
stub_const("TestAttributedWorker", klass)
end
let(:urgency) { nil }
let(:external_dependencies) { false }
let(:resource_boundary) { :unknown }
let(:feature_category) { nil }
let(:worker_class) { create_attributed_worker_class(urgency, external_dependencies, resource_boundary, feature_category) }
let(:worker) { worker_class.new }
context "high urgency" do
it_behaves_like "a metrics client middleware" do
let(:urgency) { :high }
let(:labels) { default_labels.merge(urgency: "high") }
end
end end
context "no urgency" do describe '#call' do
it_behaves_like "a metrics client middleware" do it 'yields block' do
let(:urgency) { :throttled } expect { |b| subject.call(worker_class, job, :test, double, &b) }.to yield_control.once
let(:labels) { default_labels.merge(urgency: "throttled") }
end end
end
context "external dependencies" do it 'increments enqueued jobs metric with correct labels when worker is a string of the class' do
it_behaves_like "a metrics client middleware" do expect(enqueued_jobs_metric).to receive(:increment).with(labels, 1)
let(:external_dependencies) { true }
let(:labels) { default_labels.merge(external_dependencies: "yes") }
end
end
context "cpu boundary" do subject.call(worker_class.to_s, job, :test, double) { nil }
it_behaves_like "a metrics client middleware" do
let(:resource_boundary) { :cpu }
let(:labels) { default_labels.merge(boundary: "cpu") }
end end
end
context "memory boundary" do it 'increments enqueued jobs metric with correct labels' do
it_behaves_like "a metrics client middleware" do expect(enqueued_jobs_metric).to receive(:increment).with(labels, 1)
let(:resource_boundary) { :memory }
let(:labels) { default_labels.merge(boundary: "memory") }
end
end
context "feature category" do subject.call(worker_class, job, :test, double) { nil }
it_behaves_like "a metrics client middleware" do
let(:feature_category) { :authentication }
let(:labels) { default_labels.merge(feature_category: "authentication") }
end
end
context "combined" do
it_behaves_like "a metrics client middleware" do
let(:urgency) { :high }
let(:external_dependencies) { true }
let(:resource_boundary) { :cpu }
let(:feature_category) { :authentication }
let(:labels) { default_labels.merge(urgency: "high", external_dependencies: "yes", boundary: "cpu", feature_category: "authentication") }
end end
end end
end end
end end
it_behaves_like 'metrics middleware with worker attribution'
end end
...@@ -35,7 +35,6 @@ RSpec.describe Gitlab::SidekiqMiddleware::InstrumentationLogger do ...@@ -35,7 +35,6 @@ RSpec.describe Gitlab::SidekiqMiddleware::InstrumentationLogger do
:elasticsearch_calls, :elasticsearch_calls,
:elasticsearch_duration_s, :elasticsearch_duration_s,
:elasticsearch_timed_out_count, :elasticsearch_timed_out_count,
:worker_data_consistency,
:mem_objects, :mem_objects,
:mem_bytes, :mem_bytes,
:mem_mallocs, :mem_mallocs,
......
...@@ -4,309 +4,108 @@ require 'spec_helper' ...@@ -4,309 +4,108 @@ require 'spec_helper'
# rubocop: disable RSpec/MultipleMemoizedHelpers # rubocop: disable RSpec/MultipleMemoizedHelpers
RSpec.describe Gitlab::SidekiqMiddleware::ServerMetrics do RSpec.describe Gitlab::SidekiqMiddleware::ServerMetrics do
context "with worker attribution" do shared_examples "a metrics middleware" do
subject { described_class.new } context "with mocked prometheus" do
include_context 'server metrics with mocked prometheus'
let(:queue) { :test } describe '#initialize' do
let(:worker_class) { worker.class } it 'sets concurrency metrics' do
let(:job) { {} } expect(concurrency_metric).to receive(:set).with({}, Sidekiq.options[:concurrency].to_i)
let(:job_status) { :done }
let(:labels_with_job_status) { labels.merge(job_status: job_status.to_s) }
let(:default_labels) do
{ queue: queue.to_s,
worker: worker_class.to_s,
boundary: "",
external_dependencies: "no",
feature_category: "",
urgency: "low" }
end
shared_examples "a metrics middleware" do
context "with mocked prometheus" do
let(:concurrency_metric) { double('concurrency metric') }
let(:queue_duration_seconds) { double('queue duration seconds metric') }
let(:completion_seconds_metric) { double('completion seconds metric') }
let(:user_execution_seconds_metric) { double('user execution seconds metric') }
let(:db_seconds_metric) { double('db seconds metric') }
let(:gitaly_seconds_metric) { double('gitaly seconds metric') }
let(:failed_total_metric) { double('failed total metric') }
let(:retried_total_metric) { double('retried total metric') }
let(:redis_requests_total) { double('redis calls total metric') }
let(:running_jobs_metric) { double('running jobs metric') }
let(:redis_seconds_metric) { double('redis seconds metric') }
let(:elasticsearch_seconds_metric) { double('elasticsearch seconds metric') }
let(:elasticsearch_requests_total) { double('elasticsearch calls total metric') }
before do subject
allow(Gitlab::Metrics).to receive(:histogram).with(:sidekiq_jobs_queue_duration_seconds, anything, anything, anything).and_return(queue_duration_seconds)
allow(Gitlab::Metrics).to receive(:histogram).with(:sidekiq_jobs_completion_seconds, anything, anything, anything).and_return(completion_seconds_metric)
allow(Gitlab::Metrics).to receive(:histogram).with(:sidekiq_jobs_cpu_seconds, anything, anything, anything).and_return(user_execution_seconds_metric)
allow(Gitlab::Metrics).to receive(:histogram).with(:sidekiq_jobs_db_seconds, anything, anything, anything).and_return(db_seconds_metric)
allow(Gitlab::Metrics).to receive(:histogram).with(:sidekiq_jobs_gitaly_seconds, anything, anything, anything).and_return(gitaly_seconds_metric)
allow(Gitlab::Metrics).to receive(:histogram).with(:sidekiq_redis_requests_duration_seconds, anything, anything, anything).and_return(redis_seconds_metric)
allow(Gitlab::Metrics).to receive(:histogram).with(:sidekiq_elasticsearch_requests_duration_seconds, anything, anything, anything).and_return(elasticsearch_seconds_metric)
allow(Gitlab::Metrics).to receive(:counter).with(:sidekiq_jobs_failed_total, anything).and_return(failed_total_metric)
allow(Gitlab::Metrics).to receive(:counter).with(:sidekiq_jobs_retried_total, anything).and_return(retried_total_metric)
allow(Gitlab::Metrics).to receive(:counter).with(:sidekiq_redis_requests_total, anything).and_return(redis_requests_total)
allow(Gitlab::Metrics).to receive(:counter).with(:sidekiq_elasticsearch_requests_total, anything).and_return(elasticsearch_requests_total)
allow(Gitlab::Metrics).to receive(:gauge).with(:sidekiq_running_jobs, anything, {}, :all).and_return(running_jobs_metric)
allow(Gitlab::Metrics).to receive(:gauge).with(:sidekiq_concurrency, anything, {}, :all).and_return(concurrency_metric)
allow(concurrency_metric).to receive(:set)
end end
end
describe '#initialize' do describe '#call' do
it 'sets concurrency metrics' do include_context 'server metrics call'
expect(concurrency_metric).to receive(:set).with({}, Sidekiq.options[:concurrency].to_i)
subject it 'yields block' do
end expect { |b| subject.call(worker, job, :test, &b) }.to yield_control.once
end end
describe '#call' do it 'calls BackgroundTransaction' do
let(:thread_cputime_before) { 1 } expect_next_instance_of(Gitlab::Metrics::BackgroundTransaction) do |instance|
let(:thread_cputime_after) { 2 } expect(instance).to receive(:run)
let(:thread_cputime_duration) { thread_cputime_after - thread_cputime_before }
let(:monotonic_time_before) { 11 }
let(:monotonic_time_after) { 20 }
let(:monotonic_time_duration) { monotonic_time_after - monotonic_time_before }
let(:queue_duration_for_job) { 0.01 }
let(:db_duration) { 3 }
let(:gitaly_duration) { 4 }
let(:redis_calls) { 2 }
let(:redis_duration) { 0.01 }
let(:elasticsearch_calls) { 8 }
let(:elasticsearch_duration) { 0.54 }
let(:instrumentation) do
{
gitaly_duration_s: gitaly_duration,
redis_calls: redis_calls,
redis_duration_s: redis_duration,
elasticsearch_calls: elasticsearch_calls,
elasticsearch_duration_s: elasticsearch_duration
}
end end
before do subject.call(worker, job, :test) {}
allow(subject).to receive(:get_thread_cputime).and_return(thread_cputime_before, thread_cputime_after) end
allow(Gitlab::Metrics::System).to receive(:monotonic_time).and_return(monotonic_time_before, monotonic_time_after)
allow(Gitlab::InstrumentationHelper).to receive(:queue_duration_for_job).with(job).and_return(queue_duration_for_job)
allow(ActiveRecord::LogSubscriber).to receive(:runtime).and_return(db_duration * 1000)
job[:instrumentation] = instrumentation
allow(running_jobs_metric).to receive(:increment)
allow(redis_requests_total).to receive(:increment)
allow(elasticsearch_requests_total).to receive(:increment)
allow(queue_duration_seconds).to receive(:observe)
allow(user_execution_seconds_metric).to receive(:observe)
allow(db_seconds_metric).to receive(:observe)
allow(gitaly_seconds_metric).to receive(:observe)
allow(completion_seconds_metric).to receive(:observe)
allow(redis_seconds_metric).to receive(:observe)
allow(elasticsearch_seconds_metric).to receive(:observe)
end
it 'yields block' do
expect { |b| subject.call(worker, job, :test, &b) }.to yield_control.once
end
it 'calls BackgroundTransaction' do it 'sets queue specific metrics' do
expect_next_instance_of(Gitlab::Metrics::BackgroundTransaction) do |instance| expect(running_jobs_metric).to receive(:increment).with(labels, -1)
expect(instance).to receive(:run) expect(running_jobs_metric).to receive(:increment).with(labels, 1)
end expect(queue_duration_seconds).to receive(:observe).with(labels, queue_duration_for_job) if queue_duration_for_job
expect(user_execution_seconds_metric).to receive(:observe).with(labels_with_job_status, thread_cputime_duration)
expect(db_seconds_metric).to receive(:observe).with(labels_with_job_status, db_duration)
expect(gitaly_seconds_metric).to receive(:observe).with(labels_with_job_status, gitaly_duration)
expect(completion_seconds_metric).to receive(:observe).with(labels_with_job_status, monotonic_time_duration)
expect(redis_seconds_metric).to receive(:observe).with(labels_with_job_status, redis_duration)
expect(elasticsearch_seconds_metric).to receive(:observe).with(labels_with_job_status, elasticsearch_duration)
expect(redis_requests_total).to receive(:increment).with(labels_with_job_status, redis_calls)
expect(elasticsearch_requests_total).to receive(:increment).with(labels_with_job_status, elasticsearch_calls)
subject.call(worker, job, :test) { nil }
end
subject.call(worker, job, :test) {} it 'sets the thread name if it was nil' do
end allow(Thread.current).to receive(:name).and_return(nil)
expect(Thread.current).to receive(:name=).with(Gitlab::Metrics::Samplers::ThreadsSampler::SIDEKIQ_WORKER_THREAD_NAME)
it 'sets queue specific metrics' do subject.call(worker, job, :test) { nil }
expect(running_jobs_metric).to receive(:increment).with(labels, -1) end
expect(running_jobs_metric).to receive(:increment).with(labels, 1)
expect(queue_duration_seconds).to receive(:observe).with(labels, queue_duration_for_job) if queue_duration_for_job
expect(user_execution_seconds_metric).to receive(:observe).with(labels_with_job_status, thread_cputime_duration)
expect(db_seconds_metric).to receive(:observe).with(labels_with_job_status, db_duration)
expect(gitaly_seconds_metric).to receive(:observe).with(labels_with_job_status, gitaly_duration)
expect(completion_seconds_metric).to receive(:observe).with(labels_with_job_status, monotonic_time_duration)
expect(redis_seconds_metric).to receive(:observe).with(labels_with_job_status, redis_duration)
expect(elasticsearch_seconds_metric).to receive(:observe).with(labels_with_job_status, elasticsearch_duration)
expect(redis_requests_total).to receive(:increment).with(labels_with_job_status, redis_calls)
expect(elasticsearch_requests_total).to receive(:increment).with(labels_with_job_status, elasticsearch_calls)
subject.call(worker, job, :test) { nil } context 'when job_duration is not available' do
end let(:queue_duration_for_job) { nil }
it 'sets the thread name if it was nil' do it 'does not set the queue_duration_seconds histogram' do
allow(Thread.current).to receive(:name).and_return(nil) expect(queue_duration_seconds).not_to receive(:observe)
expect(Thread.current).to receive(:name=).with(Gitlab::Metrics::Samplers::ThreadsSampler::SIDEKIQ_WORKER_THREAD_NAME)
subject.call(worker, job, :test) { nil } subject.call(worker, job, :test) { nil }
end end
end
context 'when job_duration is not available' do context 'when error is raised' do
let(:queue_duration_for_job) { nil } let(:job_status) { :fail }
it 'does not set the queue_duration_seconds histogram' do
expect(queue_duration_seconds).not_to receive(:observe)
subject.call(worker, job, :test) { nil }
end
end
context 'when error is raised' do
let(:job_status) { :fail }
it 'sets sidekiq_jobs_failed_total and reraises' do
expect(failed_total_metric).to receive(:increment).with(labels, 1)
expect { subject.call(worker, job, :test) { raise StandardError, "Failed" } }.to raise_error(StandardError, "Failed")
end
end
context 'when job is retried' do
let(:job) { { 'retry_count' => 1 } }
it 'sets sidekiq_jobs_retried_total metric' do it 'sets sidekiq_jobs_failed_total and reraises' do
expect(retried_total_metric).to receive(:increment) expect(failed_total_metric).to receive(:increment).with(labels, 1)
subject.call(worker, job, :test) { nil } expect { subject.call(worker, job, :test) { raise StandardError, "Failed" } }.to raise_error(StandardError, "Failed")
end
end end
end end
end
context "with prometheus integrated" do context 'when job is retried' do
describe '#call' do let(:job) { { 'retry_count' => 1 } }
it 'yields block' do
expect { |b| subject.call(worker, job, :test, &b) }.to yield_control.once
end
context 'when error is raised' do it 'sets sidekiq_jobs_retried_total metric' do
let(:job_status) { :fail } expect(retried_total_metric).to receive(:increment)
it 'sets sidekiq_jobs_failed_total and reraises' do subject.call(worker, job, :test) { nil }
expect { subject.call(worker, job, :test) { raise StandardError, "Failed" } }.to raise_error(StandardError, "Failed")
end
end end
end end
end end
end end
context "when workers are not attributed" do context "with prometheus integrated" do
before do describe '#call' do
stub_const('TestNonAttributedWorker', Class.new) it 'yields block' do
TestNonAttributedWorker.class_eval do expect { |b| subject.call(worker, job, :test, &b) }.to yield_control.once
include Sidekiq::Worker
end end
end
let(:worker) { TestNonAttributedWorker.new } context 'when error is raised' do
let(:labels) { default_labels.merge(urgency: "") } let(:job_status) { :fail }
it_behaves_like "a metrics middleware" it 'sets sidekiq_jobs_failed_total and reraises' do
end expect { subject.call(worker, job, :test) { raise StandardError, "Failed" } }.to raise_error(StandardError, "Failed")
end
context "when a worker is wrapped into ActiveJob" do
before do
stub_const('TestWrappedWorker', Class.new)
TestWrappedWorker.class_eval do
include Sidekiq::Worker
end end
end end
let(:job) do
{
"class" => ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper,
"wrapped" => TestWrappedWorker
}
end
let(:worker) { TestWrappedWorker.new }
let(:worker_class) { TestWrappedWorker }
let(:labels) { default_labels.merge(urgency: "") }
it_behaves_like "a metrics middleware"
end
context 'for ActionMailer::MailDeliveryJob' do
let(:job) { { 'class' => ActionMailer::MailDeliveryJob } }
let(:worker) { ActionMailer::MailDeliveryJob.new }
let(:worker_class) { ActionMailer::MailDeliveryJob }
let(:labels) { default_labels.merge(feature_category: 'issue_tracking') }
it_behaves_like 'a metrics middleware'
end end
end
context "when workers are attributed" do it_behaves_like 'metrics middleware with worker attribution' do
def create_attributed_worker_class(urgency, external_dependencies, resource_boundary, category) let(:job_status) { :done }
Class.new do let(:labels_with_job_status) { labels.merge(job_status: job_status.to_s) }
include Sidekiq::Worker
include WorkerAttributes
urgency urgency if urgency
worker_has_external_dependencies! if external_dependencies
worker_resource_boundary resource_boundary unless resource_boundary == :unknown
feature_category category unless category.nil?
end
end
let(:urgency) { nil }
let(:external_dependencies) { false }
let(:resource_boundary) { :unknown }
let(:feature_category) { nil }
let(:worker_class) { create_attributed_worker_class(urgency, external_dependencies, resource_boundary, feature_category) }
let(:worker) { worker_class.new }
context "high urgency" do
let(:urgency) { :high }
let(:labels) { default_labels.merge(urgency: "high") }
it_behaves_like "a metrics middleware"
end
context "external dependencies" do
let(:external_dependencies) { true }
let(:labels) { default_labels.merge(external_dependencies: "yes") }
it_behaves_like "a metrics middleware"
end
context "cpu boundary" do
let(:resource_boundary) { :cpu }
let(:labels) { default_labels.merge(boundary: "cpu") }
it_behaves_like "a metrics middleware"
end
context "memory boundary" do
let(:resource_boundary) { :memory }
let(:labels) { default_labels.merge(boundary: "memory") }
it_behaves_like "a metrics middleware"
end
context "feature category" do
let(:feature_category) { :authentication }
let(:labels) { default_labels.merge(feature_category: "authentication") }
it_behaves_like "a metrics middleware"
end
context "combined" do
let(:urgency) { :throttled }
let(:external_dependencies) { true }
let(:resource_boundary) { :cpu }
let(:feature_category) { :authentication }
let(:labels) { default_labels.merge(urgency: "throttled", external_dependencies: "yes", boundary: "cpu", feature_category: "authentication") }
it_behaves_like "a metrics middleware"
end
end
end end
end end
# rubocop: enable RSpec/MultipleMemoizedHelpers # rubocop: enable RSpec/MultipleMemoizedHelpers
...@@ -69,11 +69,13 @@ RSpec.describe Gitlab::SidekiqMiddleware do ...@@ -69,11 +69,13 @@ RSpec.describe Gitlab::SidekiqMiddleware do
shared_examples "a server middleware chain" do shared_examples "a server middleware chain" do
it "passes through the right server middlewares" do it "passes through the right server middlewares" do
enabled_sidekiq_middlewares.each do |middleware| enabled_sidekiq_middlewares.each do |middleware|
expect_any_instance_of(middleware).to receive(:call).with(*middleware_expected_args).once.and_call_original expect_next_instance_of(middleware) do |middleware_instance|
expect(middleware_instance).to receive(:call).with(*middleware_expected_args).once.and_call_original
end
end end
disabled_sidekiq_middlewares.each do |middleware| disabled_sidekiq_middlewares.each do |middleware|
expect_any_instance_of(middleware).not_to receive(:call) expect(middleware).not_to receive(:new)
end end
worker_class.perform_async(*job_args) worker_class.perform_async(*job_args)
......
# frozen_string_literal: true
RSpec.shared_context 'server metrics with mocked prometheus' do
let(:concurrency_metric) { double('concurrency metric') }
let(:queue_duration_seconds) { double('queue duration seconds metric') }
let(:completion_seconds_metric) { double('completion seconds metric') }
let(:user_execution_seconds_metric) { double('user execution seconds metric') }
let(:db_seconds_metric) { double('db seconds metric') }
let(:gitaly_seconds_metric) { double('gitaly seconds metric') }
let(:failed_total_metric) { double('failed total metric') }
let(:retried_total_metric) { double('retried total metric') }
let(:redis_requests_total) { double('redis calls total metric') }
let(:running_jobs_metric) { double('running jobs metric') }
let(:redis_seconds_metric) { double('redis seconds metric') }
let(:elasticsearch_seconds_metric) { double('elasticsearch seconds metric') }
let(:elasticsearch_requests_total) { double('elasticsearch calls total metric') }
before do
allow(Gitlab::Metrics).to receive(:histogram).with(:sidekiq_jobs_queue_duration_seconds, anything, anything, anything).and_return(queue_duration_seconds)
allow(Gitlab::Metrics).to receive(:histogram).with(:sidekiq_jobs_completion_seconds, anything, anything, anything).and_return(completion_seconds_metric)
allow(Gitlab::Metrics).to receive(:histogram).with(:sidekiq_jobs_cpu_seconds, anything, anything, anything).and_return(user_execution_seconds_metric)
allow(Gitlab::Metrics).to receive(:histogram).with(:sidekiq_jobs_db_seconds, anything, anything, anything).and_return(db_seconds_metric)
allow(Gitlab::Metrics).to receive(:histogram).with(:sidekiq_jobs_gitaly_seconds, anything, anything, anything).and_return(gitaly_seconds_metric)
allow(Gitlab::Metrics).to receive(:histogram).with(:sidekiq_redis_requests_duration_seconds, anything, anything, anything).and_return(redis_seconds_metric)
allow(Gitlab::Metrics).to receive(:histogram).with(:sidekiq_elasticsearch_requests_duration_seconds, anything, anything, anything).and_return(elasticsearch_seconds_metric)
allow(Gitlab::Metrics).to receive(:counter).with(:sidekiq_jobs_failed_total, anything).and_return(failed_total_metric)
allow(Gitlab::Metrics).to receive(:counter).with(:sidekiq_jobs_retried_total, anything).and_return(retried_total_metric)
allow(Gitlab::Metrics).to receive(:counter).with(:sidekiq_redis_requests_total, anything).and_return(redis_requests_total)
allow(Gitlab::Metrics).to receive(:counter).with(:sidekiq_elasticsearch_requests_total, anything).and_return(elasticsearch_requests_total)
allow(Gitlab::Metrics).to receive(:gauge).with(:sidekiq_running_jobs, anything, {}, :all).and_return(running_jobs_metric)
allow(Gitlab::Metrics).to receive(:gauge).with(:sidekiq_concurrency, anything, {}, :all).and_return(concurrency_metric)
allow(concurrency_metric).to receive(:set)
end
end
RSpec.shared_context 'server metrics call' do
let(:thread_cputime_before) { 1 }
let(:thread_cputime_after) { 2 }
let(:thread_cputime_duration) { thread_cputime_after - thread_cputime_before }
let(:monotonic_time_before) { 11 }
let(:monotonic_time_after) { 20 }
let(:monotonic_time_duration) { monotonic_time_after - monotonic_time_before }
let(:queue_duration_for_job) { 0.01 }
let(:db_duration) { 3 }
let(:gitaly_duration) { 4 }
let(:redis_calls) { 2 }
let(:redis_duration) { 0.01 }
let(:elasticsearch_calls) { 8 }
let(:elasticsearch_duration) { 0.54 }
let(:instrumentation) do
{
gitaly_duration_s: gitaly_duration,
redis_calls: redis_calls,
redis_duration_s: redis_duration,
elasticsearch_calls: elasticsearch_calls,
elasticsearch_duration_s: elasticsearch_duration
}
end
before do
allow(subject).to receive(:get_thread_cputime).and_return(thread_cputime_before, thread_cputime_after)
allow(Gitlab::Metrics::System).to receive(:monotonic_time).and_return(monotonic_time_before, monotonic_time_after)
allow(Gitlab::InstrumentationHelper).to receive(:queue_duration_for_job).with(job).and_return(queue_duration_for_job)
allow(ActiveRecord::LogSubscriber).to receive(:runtime).and_return(db_duration * 1000)
job[:instrumentation] = instrumentation
job[:gitaly_duration_s] = gitaly_duration
job[:redis_calls] = redis_calls
job[:redis_duration_s] = redis_duration
job[:elasticsearch_calls] = elasticsearch_calls
job[:elasticsearch_duration_s] = elasticsearch_duration
allow(running_jobs_metric).to receive(:increment)
allow(redis_requests_total).to receive(:increment)
allow(elasticsearch_requests_total).to receive(:increment)
allow(queue_duration_seconds).to receive(:observe)
allow(user_execution_seconds_metric).to receive(:observe)
allow(db_seconds_metric).to receive(:observe)
allow(gitaly_seconds_metric).to receive(:observe)
allow(completion_seconds_metric).to receive(:observe)
allow(redis_seconds_metric).to receive(:observe)
allow(elasticsearch_seconds_metric).to receive(:observe)
end
end
# frozen_string_literal: true
RSpec.shared_examples 'metrics middleware with worker attribution' do
subject { described_class.new }
let(:queue) { :test }
let(:worker_class) { worker.class }
let(:job) { {} }
let(:default_labels) do
{ queue: queue.to_s,
worker: worker_class.to_s,
boundary: "",
external_dependencies: "no",
feature_category: "",
urgency: "low" }
end
context "when workers are not attributed" do
before do
stub_const('TestNonAttributedWorker', Class.new)
TestNonAttributedWorker.class_eval do
include Sidekiq::Worker
end
end
it_behaves_like "a metrics middleware" do
let(:worker) { TestNonAttributedWorker.new }
let(:labels) { default_labels.merge(urgency: "") }
end
end
context "when a worker is wrapped into ActiveJob" do
before do
stub_const('TestWrappedWorker', Class.new)
TestWrappedWorker.class_eval do
include Sidekiq::Worker
end
end
it_behaves_like "a metrics middleware" do
let(:job) do
{
"class" => ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper,
"wrapped" => TestWrappedWorker
}
end
let(:worker) { TestWrappedWorker.new }
let(:labels) { default_labels.merge(urgency: "") }
end
end
context "when workers are attributed" do
def create_attributed_worker_class(urgency, external_dependencies, resource_boundary, category)
klass = Class.new do
include Sidekiq::Worker
include WorkerAttributes
urgency urgency if urgency
worker_has_external_dependencies! if external_dependencies
worker_resource_boundary resource_boundary unless resource_boundary == :unknown
feature_category category unless category.nil?
end
stub_const("TestAttributedWorker", klass)
end
let(:urgency) { nil }
let(:external_dependencies) { false }
let(:resource_boundary) { :unknown }
let(:feature_category) { nil }
let(:worker_class) { create_attributed_worker_class(urgency, external_dependencies, resource_boundary, feature_category) }
let(:worker) { worker_class.new }
context "high urgency" do
it_behaves_like "a metrics middleware" do
let(:urgency) { :high }
let(:labels) { default_labels.merge(urgency: "high") }
end
end
context "no urgency" do
it_behaves_like "a metrics middleware" do
let(:urgency) { :throttled }
let(:labels) { default_labels.merge(urgency: "throttled") }
end
end
context "external dependencies" do
it_behaves_like "a metrics middleware" do
let(:external_dependencies) { true }
let(:labels) { default_labels.merge(external_dependencies: "yes") }
end
end
context "cpu boundary" do
it_behaves_like "a metrics middleware" do
let(:resource_boundary) { :cpu }
let(:labels) { default_labels.merge(boundary: "cpu") }
end
end
context "memory boundary" do
it_behaves_like "a metrics middleware" do
let(:resource_boundary) { :memory }
let(:labels) { default_labels.merge(boundary: "memory") }
end
end
context "feature category" do
it_behaves_like "a metrics middleware" do
let(:feature_category) { :authentication }
let(:labels) { default_labels.merge(feature_category: "authentication") }
end
end
context "combined" do
it_behaves_like "a metrics middleware" do
let(:urgency) { :high }
let(:external_dependencies) { true }
let(:resource_boundary) { :cpu }
let(:feature_category) { :authentication }
let(:labels) do
default_labels.merge(
urgency: "high",
external_dependencies: "yes",
boundary: "cpu",
feature_category: "authentication")
end
end
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment