Commit 248110cd authored by Matthias Käppler's avatar Matthias Käppler Committed by Jan Provaznik

Add more node metrics to topology usage data

- node CPU
- service memory
- service process count
parent a70da6da
......@@ -71,15 +71,16 @@ module Gitlab
end
end
# Queries Prometheus for values aggregated by the given label string.
# Queries Prometheus with the given aggregate query and groups the results by mapping
# metric labels to their respective values.
#
# @return [Hash] mapping labels to their aggregate numeric values, or the empty hash if no results were found
def aggregate(func:, metric:, by:, time: Time.now)
response = query("#{func} (#{metric}) by (#{by})", time: time)
def aggregate(aggregate_query, time: Time.now)
response = query(aggregate_query, time: time)
response.to_h do |result|
group_name = result.dig('metric', by)
key = block_given? ? yield(result['metric']) : result['metric']
_timestamp, value = result['value']
[group_name, value.to_i]
[key, value.to_i]
end
end
......
......@@ -18,6 +18,7 @@ module Gitlab
class << self
include Gitlab::Utils::UsageData
include Gitlab::Utils::StrongMemoize
include Gitlab::UsageDataConcerns::Topology
def data(force_refresh: false)
Rails.cache.fetch('usage_data', force: force_refresh, expires_in: 2.weeks) do
......@@ -247,25 +248,6 @@ module Gitlab
}
end
def topology_usage_data
topology_data, duration = measure_duration do
alt_usage_data(fallback: {}) do
{
nodes: topology_node_data
}.compact
end
end
{ topology: topology_data.merge(duration_s: duration) }
end
def topology_node_data
with_prometheus_client do |client|
by_instance_mem =
client.aggregate(func: 'avg', metric: 'node_memory_MemTotal_bytes', by: 'instance').compact
by_instance_mem.values.map { |v| { node_memory_total_bytes: v } }
end
end
def app_server_type
Gitlab::Runtime.identify.to_s
rescue Gitlab::Runtime::IdentificationError => e
......
# frozen_string_literal: true
module Gitlab
module UsageDataConcerns
module Topology
include Gitlab::Utils::UsageData
def topology_usage_data
topology_data, duration = measure_duration do
alt_usage_data(fallback: {}) do
{
nodes: topology_node_data
}.compact
end
end
{ topology: topology_data.merge(duration_s: duration) }
end
private
def topology_node_data
with_prometheus_client do |client|
# node-level data
by_instance_mem = topology_node_memory(client)
by_instance_cpus = topology_node_cpus(client)
# service-level data
by_instance_by_job_by_metric_memory = topology_all_service_memory(client)
by_instance_by_job_process_count = topology_all_service_process_count(client)
instances = Set.new(by_instance_mem.keys + by_instance_cpus.keys)
instances.map do |instance|
{
node_memory_total_bytes: by_instance_mem[instance],
node_cpus: by_instance_cpus[instance],
node_services:
topology_node_services(instance, by_instance_by_job_process_count, by_instance_by_job_by_metric_memory)
}.compact
end
end
end
def topology_node_memory(client)
aggregate_single(client, 'avg (node_memory_MemTotal_bytes) by (instance)')
end
def topology_node_cpus(client)
aggregate_single(client, 'count (node_cpu_seconds_total{mode="idle"}) by (instance)')
end
def topology_all_service_memory(client)
aggregate_many(
client,
'avg ({__name__=~"ruby_process_(resident|unique|proportional)_memory_bytes"}) by (instance, job, __name__)'
)
end
def topology_all_service_process_count(client)
aggregate_many(client, 'count (ruby_process_start_time_seconds) by (instance, job)')
end
def topology_node_services(instance, all_process_counts, all_process_memory)
# returns all node service data grouped by service name as the key
instance_service_data =
topology_instance_service_process_count(instance, all_process_counts)
.deep_merge(topology_instance_service_memory(instance, all_process_memory))
# map to list of hashes where service name becomes a value instead
instance_service_data.map do |service, data|
{ name: service.to_s }.merge(data)
end
end
def topology_instance_service_process_count(instance, all_instance_data)
topology_data_for_instance(instance, all_instance_data).to_h do |metric, count|
job = metric['job'].underscore.to_sym
[job, { process_count: count }]
end
end
def topology_instance_service_memory(instance, all_instance_data)
topology_data_for_instance(instance, all_instance_data).each_with_object({}) do |entry, hash|
metric, memory = entry
job = metric['job'].underscore.to_sym
key =
case metric['__name__']
when 'ruby_process_resident_memory_bytes' then :process_memory_rss
when 'ruby_process_unique_memory_bytes' then :process_memory_uss
when 'ruby_process_proportional_memory_bytes' then :process_memory_pss
end
hash[job] ||= {}
hash[job][key] ||= memory
end
end
def topology_data_for_instance(instance, all_instance_data)
all_instance_data.filter { |metric, _value| metric['instance'] == instance }
end
def drop_port(instance)
instance.gsub(/:.+$/, '')
end
# Will retain a single `instance` key that values are mapped to
def aggregate_single(client, query)
client.aggregate(query) { |metric| drop_port(metric['instance']) }
end
# Will retain a composite key that values are mapped to
def aggregate_many(client, query)
client.aggregate(query) do |metric|
metric['instance'] = drop_port(metric['instance'])
metric
end
end
end
end
end
......@@ -172,8 +172,7 @@ describe Gitlab::PrometheusClient do
end
describe '#aggregate' do
let(:user_query) { { func: 'avg', metric: 'metric', by: 'job' } }
let(:prometheus_query) { 'avg (metric) by (job)' }
let(:query) { 'avg (metric) by (job)' }
let(:prometheus_response) do
{
"status": "success",
......@@ -192,19 +191,19 @@ describe Gitlab::PrometheusClient do
}
}
end
let(:query_url) { prometheus_query_with_time_url(prometheus_query, Time.now.utc) }
let(:query_url) { prometheus_query_with_time_url(query, Time.now.utc) }
around do |example|
Timecop.freeze { example.run }
end
context 'when request returns vector results' do
it 'returns data from the API call' do
it 'returns data from the API call grouped by labels' do
req_stub = stub_prometheus_request(query_url, body: prometheus_response)
expect(subject.aggregate(user_query)).to eq({
"gitlab-rails" => 1,
"gitlab-sidekiq" => 2
expect(subject.aggregate(query)).to eq({
{ "job" => "gitlab-rails" } => 1,
{ "job" => "gitlab-sidekiq" } => 2
})
expect(req_stub).to have_been_requested
end
......@@ -214,13 +213,13 @@ describe Gitlab::PrometheusClient do
it 'returns {}' do
req_stub = stub_prometheus_request(query_url, body: prometheus_empty_body('vector'))
expect(subject.aggregate(user_query)).to eq({})
expect(subject.aggregate(query)).to eq({})
expect(req_stub).to have_been_requested
end
end
it_behaves_like 'failure response' do
let(:execute_query) { subject.aggregate(user_query) }
let(:execute_query) { subject.aggregate(query) }
end
end
......
# frozen_string_literal: true
require 'spec_helper'
describe Gitlab::UsageDataConcerns::Topology do
include UsageDataHelpers
describe '#topology_usage_data' do
subject { Class.new.extend(described_class).topology_usage_data }
before do
# this pins down time shifts when benchmarking durations
allow(Process).to receive(:clock_gettime).and_return(0)
end
context 'when embedded Prometheus server is enabled' do
before do
expect(Gitlab::Prometheus::Internal).to receive(:prometheus_enabled?).and_return(true)
expect(Gitlab::Prometheus::Internal).to receive(:uri).and_return('http://prom:9090')
end
it 'contains a topology element' do
allow_prometheus_queries
expect(subject).to have_key(:topology)
end
context 'tracking node metrics' do
it 'contains node level metrics for each instance' do
expect_prometheus_api_to(
receive_node_memory_query,
receive_node_cpu_count_query,
receive_node_service_memory_query,
receive_node_service_process_count_query
)
expect(subject[:topology]).to eq({
duration_s: 0,
nodes: [
{
node_memory_total_bytes: 512,
node_cpus: 8,
node_services: [
{
name: 'gitlab_rails',
process_count: 10,
process_memory_rss: 300,
process_memory_uss: 301,
process_memory_pss: 302
},
{
name: 'gitlab_sidekiq',
process_count: 5,
process_memory_rss: 303
}
]
},
{
node_memory_total_bytes: 1024,
node_cpus: 16,
node_services: [
{
name: 'gitlab_sidekiq',
process_count: 15,
process_memory_rss: 400,
process_memory_pss: 401
}
]
}
]
})
end
end
context 'and some node memory metrics are missing' do
it 'removes the respective entries' do
expect_prometheus_api_to(
receive_node_memory_query(result: []),
receive_node_cpu_count_query,
receive_node_service_memory_query,
receive_node_service_process_count_query
)
keys = subject[:topology][:nodes].flat_map(&:keys)
expect(keys).not_to include(:node_memory_total_bytes)
expect(keys).to include(:node_cpus, :node_services)
end
end
context 'and no results are found' do
it 'does not report anything' do
expect_prometheus_api_to receive(:aggregate).at_least(:once).and_return({})
expect(subject[:topology]).to eq({
duration_s: 0,
nodes: []
})
end
end
context 'and a connection error is raised' do
it 'does not report anything' do
expect_prometheus_api_to receive(:aggregate).and_raise('Connection failed')
expect(subject[:topology]).to eq({ duration_s: 0 })
end
end
end
context 'when embedded Prometheus server is disabled' do
it 'does not report anything' do
expect(Gitlab::Prometheus::Internal).to receive(:prometheus_enabled?).and_return(false)
expect(subject[:topology]).to eq({ duration_s: 0 })
end
end
end
def receive_node_memory_query(result: nil)
receive(:query)
.with('avg (node_memory_MemTotal_bytes) by (instance)', an_instance_of(Hash))
.and_return(result || [
{
'metric' => { 'instance' => 'instance1:8080' },
'value' => [1000, '512']
},
{
'metric' => { 'instance' => 'instance2:8090' },
'value' => [1000, '1024']
}
])
end
def receive_node_cpu_count_query(result: nil)
receive(:query)
.with('count (node_cpu_seconds_total{mode="idle"}) by (instance)', an_instance_of(Hash))
.and_return(result || [
{
'metric' => { 'instance' => 'instance2:8090' },
'value' => [1000, '16']
},
{
'metric' => { 'instance' => 'instance1:8080' },
'value' => [1000, '8']
}
])
end
def receive_node_service_memory_query(result: nil)
receive(:query)
.with('avg ({__name__=~"ruby_process_(resident|unique|proportional)_memory_bytes"}) by (instance, job, __name__)', an_instance_of(Hash))
.and_return(result || [
# instance 1: runs Puma + a small Sidekiq
{
'metric' => { 'instance' => 'instance1:8080', 'job' => 'gitlab-rails', '__name__' => 'ruby_process_resident_memory_bytes' },
'value' => [1000, '300']
},
{
'metric' => { 'instance' => 'instance1:8080', 'job' => 'gitlab-rails', '__name__' => 'ruby_process_unique_memory_bytes' },
'value' => [1000, '301']
},
{
'metric' => { 'instance' => 'instance1:8080', 'job' => 'gitlab-rails', '__name__' => 'ruby_process_proportional_memory_bytes' },
'value' => [1000, '302']
},
{
'metric' => { 'instance' => 'instance1:8090', 'job' => 'gitlab-sidekiq', '__name__' => 'ruby_process_resident_memory_bytes' },
'value' => [1000, '303']
},
# instance 2: runs a dedicated Sidekiq
{
'metric' => { 'instance' => 'instance2:8090', 'job' => 'gitlab-sidekiq', '__name__' => 'ruby_process_resident_memory_bytes' },
'value' => [1000, '400']
},
{
'metric' => { 'instance' => 'instance2:8090', 'job' => 'gitlab-sidekiq', '__name__' => 'ruby_process_proportional_memory_bytes' },
'value' => [1000, '401']
}
])
end
def receive_node_service_process_count_query(result: nil)
receive(:query)
.with('count (ruby_process_start_time_seconds) by (instance, job)', an_instance_of(Hash))
.and_return(result || [
# instance 1
{
'metric' => { 'instance' => 'instance1:8080', 'job' => 'gitlab-rails' },
'value' => [1000, '10']
},
{
'metric' => { 'instance' => 'instance1:8090', 'job' => 'gitlab-sidekiq' },
'value' => [1000, '5']
},
# instance 2
{
'metric' => { 'instance' => 'instance2:8090', 'job' => 'gitlab-sidekiq' },
'value' => [1000, '15']
}
])
end
end
......@@ -115,6 +115,10 @@ describe Gitlab::UsageData, :aggregate_failures do
)
end
it 'gathers topology data' do
expect(subject.keys).to include(:topology)
end
context 'with existing container expiration policies' do
let_it_be(:disabled) { create(:container_expiration_policy, enabled: false) }
let_it_be(:enabled) { create(:container_expiration_policy, enabled: true) }
......@@ -278,88 +282,6 @@ describe Gitlab::UsageData, :aggregate_failures do
end
end
describe '#topology_usage_data' do
subject { described_class.topology_usage_data }
before do
# this pins down time shifts when benchmarking durations
allow(Process).to receive(:clock_gettime).and_return(0)
end
context 'when embedded Prometheus server is enabled' do
before do
expect(Gitlab::Prometheus::Internal).to receive(:prometheus_enabled?).and_return(true)
expect(Gitlab::Prometheus::Internal).to receive(:uri).and_return('http://prom:9090')
end
it 'contains a topology element' do
allow_prometheus_queries
expect(subject).to have_key(:topology)
end
context 'tracking node metrics' do
it 'contains node level metrics for each instance' do
expect_prometheus_api_to receive(:aggregate)
.with(func: 'avg', metric: 'node_memory_MemTotal_bytes', by: 'instance')
.and_return({
'instance1' => 512,
'instance2' => 1024
})
expect(subject[:topology]).to eq({
duration_s: 0,
nodes: [
{
node_memory_total_bytes: 512
},
{
node_memory_total_bytes: 1024
}
]
})
end
end
context 'and no results are found' do
it 'does not report anything' do
expect_prometheus_api_to receive(:aggregate).and_return({})
expect(subject[:topology]).to eq({
duration_s: 0,
nodes: []
})
end
end
context 'and a connection error is raised' do
it 'does not report anything' do
expect_prometheus_api_to receive(:aggregate).and_raise('Connection failed')
expect(subject[:topology]).to eq({ duration_s: 0 })
end
end
end
context 'when embedded Prometheus server is disabled' do
it 'does not report anything' do
expect(subject[:topology]).to eq({ duration_s: 0 })
end
end
def expect_prometheus_api_to(receive_matcher)
expect_next_instance_of(Gitlab::PrometheusClient) do |client|
expect(client).to receive_matcher
end
end
def allow_prometheus_queries
allow_next_instance_of(Gitlab::PrometheusClient) do |client|
allow(client).to receive(:aggregate).and_return({})
end
end
end
describe '#app_server_type' do
subject { described_class.app_server_type }
......
......@@ -220,4 +220,16 @@ module UsageDataHelpers
'proxy_download' => false } }
)
end
def expect_prometheus_api_to(*receive_matchers)
expect_next_instance_of(Gitlab::PrometheusClient) do |client|
receive_matchers.each { |m| expect(client).to m }
end
end
def allow_prometheus_queries
allow_next_instance_of(Gitlab::PrometheusClient) do |client|
allow(client).to receive(:aggregate).and_return({})
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment