Commit 26086c43 authored by Heinrich Lee Yu's avatar Heinrich Lee Yu

Merge branch 'ak/historical-pods' into 'master'

Gather historical pod list from Elasticsearch

See merge request gitlab-org/gitlab!29168
parents 6d05f5b6 7d98b6e5
......@@ -62,13 +62,11 @@ module PodLogs
end
def get_raw_pods(result)
result[:raw_pods] = cluster.kubeclient.get_pods(namespace: namespace)
success(result)
raise NotImplementedError
end
def get_pod_names(result)
result[:pods] = result[:raw_pods].map(&:metadata).map(&:name)
result[:pods] = result[:raw_pods].map { |p| p[:name] }
success(result)
end
......
......@@ -23,6 +23,23 @@ module PodLogs
super + %i(cursor)
end
def get_raw_pods(result)
client = cluster&.application_elastic_stack&.elasticsearch_client
return error(_('Unable to connect to Elasticsearch')) unless client
result[:raw_pods] = ::Gitlab::Elasticsearch::Logs::Pods.new(client).pods(namespace)
success(result)
rescue Elasticsearch::Transport::Transport::ServerError => e
::Gitlab::ErrorTracking.track_exception(e)
error(_('Elasticsearch returned status code: %{status_code}') % {
# ServerError is the parent class of exceptions named after HTTP status codes, eg: "Elasticsearch::Transport::Transport::Errors::NotFound"
# there is no method on the exception other than the class name to determine the type of error encountered.
status_code: e.class.name.split('::').last
})
end
def check_times(result)
result[:start_time] = params['start_time'] if params.key?('start_time') && Time.iso8601(params['start_time'])
result[:end_time] = params['end_time'] if params.key?('end_time') && Time.iso8601(params['end_time'])
......@@ -48,7 +65,7 @@ module PodLogs
client = cluster&.application_elastic_stack&.elasticsearch_client
return error(_('Unable to connect to Elasticsearch')) unless client
response = ::Gitlab::Elasticsearch::Logs.new(client).pod_logs(
response = ::Gitlab::Elasticsearch::Logs::Lines.new(client).pod_logs(
namespace,
pod_name: result[:pod_name],
container_name: result[:container_name],
......@@ -69,7 +86,7 @@ module PodLogs
# there is no method on the exception other than the class name to determine the type of error encountered.
status_code: e.class.name.split('::').last
})
rescue ::Gitlab::Elasticsearch::Logs::InvalidCursor
rescue ::Gitlab::Elasticsearch::Logs::Lines::InvalidCursor
error(_('Invalid cursor value provided'))
end
end
......
......@@ -21,6 +21,17 @@ module PodLogs
private
def get_raw_pods(result)
result[:raw_pods] = cluster.kubeclient.get_pods(namespace: namespace).map do |pod|
{
name: pod.metadata.name,
container_names: pod.spec.containers.map(&:name)
}
end
success(result)
end
def check_pod_name(result)
# If pod_name is not received as parameter, get the pod logs of the first
# pod of this namespace.
......@@ -43,11 +54,11 @@ module PodLogs
end
def check_container_name(result)
pod_details = result[:raw_pods].find { |p| p.metadata.name == result[:pod_name] }
containers = pod_details.spec.containers.map(&:name)
pod_details = result[:raw_pods].find { |p| p[:name] == result[:pod_name] }
container_names = pod_details[:container_names]
# select first container if not specified
result[:container_name] ||= containers.first
result[:container_name] ||= container_names.first
unless result[:container_name]
return error(_('No containers available'))
......@@ -58,7 +69,7 @@ module PodLogs
' %{max_length} chars' % { max_length: K8S_NAME_MAX_LENGTH }))
end
unless containers.include?(result[:container_name])
unless container_names.include?(result[:container_name])
return error(_('Container does not exist'))
end
......
---
title: Gather historical pod list from Elasticsearch
merge_request: 29168
author:
type: added
# frozen_string_literal: true
module Gitlab
module Elasticsearch
class Logs
InvalidCursor = Class.new(RuntimeError)
# How many log lines to fetch in a query
LOGS_LIMIT = 500
def initialize(client)
@client = client
end
def pod_logs(namespace, pod_name: nil, container_name: nil, search: nil, start_time: nil, end_time: nil, cursor: nil)
query = { bool: { must: [] } }.tap do |q|
filter_pod_name(q, pod_name)
filter_namespace(q, namespace)
filter_container_name(q, container_name)
filter_search(q, search)
filter_times(q, start_time, end_time)
end
body = build_body(query, cursor)
response = @client.search body: body
format_response(response)
end
private
def build_body(query, cursor = nil)
body = {
query: query,
# reverse order so we can query N-most recent records
sort: [
{ "@timestamp": { order: :desc } },
{ "offset": { order: :desc } }
],
# only return these fields in the response
_source: ["@timestamp", "message", "kubernetes.pod.name"],
# fixed limit for now, we should support paginated queries
size: ::Gitlab::Elasticsearch::Logs::LOGS_LIMIT
}
unless cursor.nil?
body[:search_after] = decode_cursor(cursor)
end
body
end
def filter_pod_name(query, pod_name)
# We can filter by "all pods" with a null pod_name
return if pod_name.nil?
query[:bool][:must] << {
match_phrase: {
"kubernetes.pod.name" => {
query: pod_name
}
}
}
end
def filter_namespace(query, namespace)
query[:bool][:must] << {
match_phrase: {
"kubernetes.namespace" => {
query: namespace
}
}
}
end
def filter_container_name(query, container_name)
# A pod can contain multiple containers.
# By default we return logs from every container
return if container_name.nil?
query[:bool][:must] << {
match_phrase: {
"kubernetes.container.name" => {
query: container_name
}
}
}
end
def filter_search(query, search)
return if search.nil?
query[:bool][:must] << {
simple_query_string: {
query: search,
fields: [:message],
default_operator: :and
}
}
end
def filter_times(query, start_time, end_time)
return unless start_time || end_time
time_range = { range: { :@timestamp => {} } }.tap do |tr|
tr[:range][:@timestamp][:gte] = start_time if start_time
tr[:range][:@timestamp][:lt] = end_time if end_time
end
query[:bool][:filter] = [time_range]
end
def format_response(response)
results = response.fetch("hits", {}).fetch("hits", [])
last_result = results.last
results = results.map do |hit|
{
timestamp: hit["_source"]["@timestamp"],
message: hit["_source"]["message"],
pod: hit["_source"]["kubernetes"]["pod"]["name"]
}
end
# we queried for the N-most recent records but we want them ordered oldest to newest
{
logs: results.reverse,
cursor: last_result.nil? ? nil : encode_cursor(last_result["sort"])
}
end
# we want to hide the implementation details of the search_after parameter from the frontend
# behind a single easily transmitted value
def encode_cursor(obj)
obj.join(',')
end
def decode_cursor(obj)
cursor = obj.split(',').map(&:to_i)
unless valid_cursor(cursor)
raise InvalidCursor, "invalid cursor format"
end
cursor
end
def valid_cursor(cursor)
cursor.instance_of?(Array) &&
cursor.length == 2 &&
cursor.map {|i| i.instance_of?(Integer)}.reduce(:&)
end
end
end
end
# frozen_string_literal: true
module Gitlab
module Elasticsearch
module Logs
class Lines
InvalidCursor = Class.new(RuntimeError)
# How many log lines to fetch in a query
LOGS_LIMIT = 500
def initialize(client)
@client = client
end
def pod_logs(namespace, pod_name: nil, container_name: nil, search: nil, start_time: nil, end_time: nil, cursor: nil)
query = { bool: { must: [] } }.tap do |q|
filter_pod_name(q, pod_name)
filter_namespace(q, namespace)
filter_container_name(q, container_name)
filter_search(q, search)
filter_times(q, start_time, end_time)
end
body = build_body(query, cursor)
response = @client.search body: body
format_response(response)
end
private
def build_body(query, cursor = nil)
body = {
query: query,
# reverse order so we can query N-most recent records
sort: [
{ "@timestamp": { order: :desc } },
{ "offset": { order: :desc } }
],
# only return these fields in the response
_source: ["@timestamp", "message", "kubernetes.pod.name"],
# fixed limit for now, we should support paginated queries
size: ::Gitlab::Elasticsearch::Logs::Lines::LOGS_LIMIT
}
unless cursor.nil?
body[:search_after] = decode_cursor(cursor)
end
body
end
def filter_pod_name(query, pod_name)
# We can filter by "all pods" with a null pod_name
return if pod_name.nil?
query[:bool][:must] << {
match_phrase: {
"kubernetes.pod.name" => {
query: pod_name
}
}
}
end
def filter_namespace(query, namespace)
query[:bool][:must] << {
match_phrase: {
"kubernetes.namespace" => {
query: namespace
}
}
}
end
def filter_container_name(query, container_name)
# A pod can contain multiple containers.
# By default we return logs from every container
return if container_name.nil?
query[:bool][:must] << {
match_phrase: {
"kubernetes.container.name" => {
query: container_name
}
}
}
end
def filter_search(query, search)
return if search.nil?
query[:bool][:must] << {
simple_query_string: {
query: search,
fields: [:message],
default_operator: :and
}
}
end
def filter_times(query, start_time, end_time)
return unless start_time || end_time
time_range = { range: { :@timestamp => {} } }.tap do |tr|
tr[:range][:@timestamp][:gte] = start_time if start_time
tr[:range][:@timestamp][:lt] = end_time if end_time
end
query[:bool][:filter] = [time_range]
end
def format_response(response)
results = response.fetch("hits", {}).fetch("hits", [])
last_result = results.last
results = results.map do |hit|
{
timestamp: hit["_source"]["@timestamp"],
message: hit["_source"]["message"],
pod: hit["_source"]["kubernetes"]["pod"]["name"]
}
end
# we queried for the N-most recent records but we want them ordered oldest to newest
{
logs: results.reverse,
cursor: last_result.nil? ? nil : encode_cursor(last_result["sort"])
}
end
# we want to hide the implementation details of the search_after parameter from the frontend
# behind a single easily transmitted value
def encode_cursor(obj)
obj.join(',')
end
def decode_cursor(obj)
cursor = obj.split(',').map(&:to_i)
unless valid_cursor(cursor)
raise InvalidCursor, "invalid cursor format"
end
cursor
end
def valid_cursor(cursor)
cursor.instance_of?(Array) &&
cursor.length == 2 &&
cursor.map {|i| i.instance_of?(Integer)}.reduce(:&)
end
end
end
end
end
# frozen_string_literal: true
module Gitlab
module Elasticsearch
module Logs
class Pods
# How many items to fetch in a query
PODS_LIMIT = 500
CONTAINERS_LIMIT = 500
def initialize(client)
@client = client
end
def pods(namespace)
body = build_body(namespace)
response = @client.search body: body
format_response(response)
end
private
def build_body(namespace)
{
aggs: {
pods: {
aggs: {
containers: {
terms: {
field: 'kubernetes.container.name',
size: ::Gitlab::Elasticsearch::Logs::Pods::CONTAINERS_LIMIT
}
}
},
terms: {
field: 'kubernetes.pod.name',
size: ::Gitlab::Elasticsearch::Logs::Pods::PODS_LIMIT
}
}
},
query: {
bool: {
must: {
match_phrase: {
"kubernetes.namespace": namespace
}
}
}
},
# don't populate hits, only the aggregation is needed
size: 0
}
end
def format_response(response)
results = response.dig("aggregations", "pods", "buckets") || []
results.map do |bucket|
{
name: bucket["key"],
container_names: (bucket.dig("containers", "buckets") || []).map do |cbucket|
cbucket["key"]
end
}
end
end
end
end
end
end
{
"aggs": {
"pods": {
"aggs": {
"containers": {
"terms": {
"field": "kubernetes.container.name",
"size": 500
}
}
},
"terms": {
"field": "kubernetes.pod.name",
"size": 500
}
}
},
"query": {
"bool": {
"must": {
"match_phrase": {
"kubernetes.namespace": "autodevops-deploy-9-production"
}
}
}
},
"size": 0
}
{
"took": 8540,
"timed_out": false,
"_shards": {
"total": 153,
"successful": 153,
"skipped": 0,
"failed": 0
},
"hits": {
"total": 62143,
"max_score": 0.0,
"hits": [
]
},
"aggregations": {
"pods": {
"doc_count_error_upper_bound": 0,
"sum_other_doc_count": 0,
"buckets": [
{
"key": "runner-gitlab-runner-7bbfb5dcb5-p6smb",
"doc_count": 19795,
"containers": {
"doc_count_error_upper_bound": 0,
"sum_other_doc_count": 0,
"buckets": [
{
"key": "runner-gitlab-runner",
"doc_count": 19795
}
]
}
},
{
"key": "elastic-stack-elasticsearch-master-1",
"doc_count": 13185,
"containers": {
"doc_count_error_upper_bound": 0,
"sum_other_doc_count": 0,
"buckets": [
{
"key": "elasticsearch",
"doc_count": 13158
},
{
"key": "chown",
"doc_count": 24
},
{
"key": "sysctl",
"doc_count": 3
}
]
}
},
{
"key": "ingress-nginx-ingress-controller-76449bcc8d-8qgl6",
"doc_count": 3437,
"containers": {
"doc_count_error_upper_bound": 0,
"sum_other_doc_count": 0,
"buckets": [
{
"key": "nginx-ingress-controller",
"doc_count": 3437
}
]
}
}
]
}
}
}
......@@ -2,7 +2,7 @@
require 'spec_helper'
describe Gitlab::Elasticsearch::Logs do
describe Gitlab::Elasticsearch::Logs::Lines do
let(:client) { Elasticsearch::Transport::Client }
let(:es_message_1) { { timestamp: "2019-12-13T14:35:34.034Z", pod: "production-6866bc8974-m4sk4", message: "10.8.2.1 - - [25/Oct/2019:08:03:22 UTC] \"GET / HTTP/1.1\" 200 13" } }
......
# frozen_string_literal: true
require 'spec_helper'
describe Gitlab::Elasticsearch::Logs::Pods do
let(:client) { Elasticsearch::Transport::Client }
let(:es_query) { JSON.parse(fixture_file('lib/elasticsearch/pods_query.json'), symbolize_names: true) }
let(:es_response) { JSON.parse(fixture_file('lib/elasticsearch/pods_response.json')) }
let(:namespace) { "autodevops-deploy-9-production" }
subject { described_class.new(client) }
describe '#pods' do
it 'returns the pods' do
expect(client).to receive(:search).with(body: es_query).and_return(es_response)
result = subject.pods(namespace)
expect(result).to eq([
{
name: "runner-gitlab-runner-7bbfb5dcb5-p6smb",
container_names: %w[runner-gitlab-runner]
},
{
name: "elastic-stack-elasticsearch-master-1",
container_names: %w[elasticsearch chown sysctl]
},
{
name: "ingress-nginx-ingress-controller-76449bcc8d-8qgl6",
container_names: %w[nginx-ingress-controller]
}
])
end
end
end
......@@ -13,10 +13,16 @@ describe ::PodLogs::BaseService do
let(:container_name) { 'container-0' }
let(:params) { {} }
let(:raw_pods) do
JSON.parse([
kube_pod(name: pod_name),
kube_pod(name: pod_name_2)
].to_json, object_class: OpenStruct)
[
{
name: pod_name,
container_names: %w(container-0-0 container-0-1)
},
{
name: pod_name_2,
container_names: %w(container-1-0 container-1-1)
}
]
end
subject { described_class.new(cluster, namespace, params: params) }
......@@ -99,19 +105,6 @@ describe ::PodLogs::BaseService do
end
end
describe '#get_raw_pods' do
let(:service) { create(:cluster_platform_kubernetes, :configured) }
it 'returns success with passthrough k8s response' do
stub_kubeclient_pods(namespace)
result = subject.send(:get_raw_pods, {})
expect(result[:status]).to eq(:success)
expect(result[:raw_pods].first).to be_a(Kubeclient::Resource)
end
end
describe '#get_pod_names' do
it 'returns success with a list of pods' do
result = subject.send(:get_pod_names, raw_pods: raw_pods)
......
......@@ -21,8 +21,63 @@ describe ::PodLogs::ElasticsearchService do
]
end
let(:raw_pods) do
[
{
name: pod_name,
container_names: [container_name, "#{container_name}-1"]
}
]
end
subject { described_class.new(cluster, namespace, params: params) }
describe '#get_raw_pods' do
before do
create(:clusters_applications_elastic_stack, :installed, cluster: cluster)
end
it 'returns success with elasticsearch response' do
allow_any_instance_of(::Clusters::Applications::ElasticStack)
.to receive(:elasticsearch_client)
.and_return(Elasticsearch::Transport::Client.new)
allow_any_instance_of(::Gitlab::Elasticsearch::Logs::Pods)
.to receive(:pods)
.with(namespace)
.and_return(raw_pods)
result = subject.send(:get_raw_pods, {})
expect(result[:status]).to eq(:success)
expect(result[:raw_pods]).to eq(raw_pods)
end
it 'returns an error when ES is unreachable' do
allow_any_instance_of(::Clusters::Applications::ElasticStack)
.to receive(:elasticsearch_client)
.and_return(nil)
result = subject.send(:get_raw_pods, {})
expect(result[:status]).to eq(:error)
expect(result[:message]).to eq('Unable to connect to Elasticsearch')
end
it 'handles server errors from elasticsearch' do
allow_any_instance_of(::Clusters::Applications::ElasticStack)
.to receive(:elasticsearch_client)
.and_return(Elasticsearch::Transport::Client.new)
allow_any_instance_of(::Gitlab::Elasticsearch::Logs::Pods)
.to receive(:pods)
.and_raise(Elasticsearch::Transport::Transport::Errors::ServiceUnavailable.new)
result = subject.send(:get_raw_pods, {})
expect(result[:status]).to eq(:error)
expect(result[:message]).to eq('Elasticsearch returned status code: ServiceUnavailable')
end
end
describe '#check_times' do
context 'with start and end provided and valid' do
let(:params) do
......@@ -168,7 +223,7 @@ describe ::PodLogs::ElasticsearchService do
allow_any_instance_of(::Clusters::Applications::ElasticStack)
.to receive(:elasticsearch_client)
.and_return(Elasticsearch::Transport::Client.new)
allow_any_instance_of(::Gitlab::Elasticsearch::Logs)
allow_any_instance_of(::Gitlab::Elasticsearch::Logs::Lines)
.to receive(:pod_logs)
.with(namespace, pod_name: pod_name, container_name: container_name, search: search, start_time: start_time, end_time: end_time, cursor: cursor)
.and_return({ logs: expected_logs, cursor: expected_cursor })
......@@ -195,7 +250,7 @@ describe ::PodLogs::ElasticsearchService do
allow_any_instance_of(::Clusters::Applications::ElasticStack)
.to receive(:elasticsearch_client)
.and_return(Elasticsearch::Transport::Client.new)
allow_any_instance_of(::Gitlab::Elasticsearch::Logs)
allow_any_instance_of(::Gitlab::Elasticsearch::Logs::Lines)
.to receive(:pod_logs)
.and_raise(Elasticsearch::Transport::Transport::Errors::ServiceUnavailable.new)
......@@ -209,9 +264,9 @@ describe ::PodLogs::ElasticsearchService do
allow_any_instance_of(::Clusters::Applications::ElasticStack)
.to receive(:elasticsearch_client)
.and_return(Elasticsearch::Transport::Client.new)
allow_any_instance_of(::Gitlab::Elasticsearch::Logs)
allow_any_instance_of(::Gitlab::Elasticsearch::Logs::Lines)
.to receive(:pod_logs)
.and_raise(::Gitlab::Elasticsearch::Logs::InvalidCursor.new)
.and_raise(::Gitlab::Elasticsearch::Logs::Lines::InvalidCursor.new)
result = subject.send(:pod_logs, result_arg)
......
......@@ -20,14 +20,36 @@ describe ::PodLogs::KubernetesService do
end
let(:raw_pods) do
JSON.parse([
kube_pod(name: pod_name),
kube_pod(name: pod_name_2, container_name: container_name_2)
].to_json, object_class: OpenStruct)
[
{
name: pod_name,
container_names: [container_name, "#{container_name}-1"]
},
{
name: pod_name_2,
container_names: [container_name_2, "#{container_name_2}-1"]
}
]
end
subject { described_class.new(cluster, namespace, params: params) }
describe '#get_raw_pods' do
let(:service) { create(:cluster_platform_kubernetes, :configured) }
it 'returns success with passthrough k8s response' do
stub_kubeclient_pods(namespace)
result = subject.send(:get_raw_pods, {})
expect(result[:status]).to eq(:success)
expect(result[:raw_pods]).to eq([{
name: 'kube-pod',
container_names: %w(container-0 container-0-1)
}])
end
end
describe '#pod_logs' do
let(:result_arg) do
{
......@@ -233,7 +255,7 @@ describe ::PodLogs::KubernetesService do
end
it 'returns error if container_name was not specified and there are no containers on the pod' do
raw_pods.first.spec.containers = []
raw_pods.first[:container_names] = []
result = subject.send(:check_container_name,
pod_name: pod_name,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment