Commit 4c8931cb authored by Adrien Kohlbecker's avatar Adrien Kohlbecker Committed by James Fargher

Add pagination to logs API

parent 1b09ecde
...@@ -48,7 +48,7 @@ module Projects ...@@ -48,7 +48,7 @@ module Projects
end end
def elasticsearch_params def elasticsearch_params
params.permit(:container_name, :pod_name, :search, :start, :end) params.permit(:container_name, :pod_name, :search, :start, :end, :cursor)
end end
def environment def environment
......
...@@ -10,8 +10,6 @@ module PodLogs ...@@ -10,8 +10,6 @@ module PodLogs
CACHE_KEY_GET_POD_LOG = 'get_pod_log' CACHE_KEY_GET_POD_LOG = 'get_pod_log'
K8S_NAME_MAX_LENGTH = 253 K8S_NAME_MAX_LENGTH = 253
SUCCESS_RETURN_KEYS = %i(status logs pod_name container_name pods).freeze
def id def id
cluster.id cluster.id
end end
...@@ -49,6 +47,10 @@ module PodLogs ...@@ -49,6 +47,10 @@ module PodLogs
%w(pod_name container_name) %w(pod_name container_name)
end end
def success_return_keys
%i(status logs pod_name container_name pods)
end
def check_arguments(result) def check_arguments(result)
return error(_('Cluster does not exist')) if cluster.nil? return error(_('Cluster does not exist')) if cluster.nil?
return error(_('Namespace is empty')) if namespace.blank? return error(_('Namespace is empty')) if namespace.blank?
...@@ -122,7 +124,7 @@ module PodLogs ...@@ -122,7 +124,7 @@ module PodLogs
end end
def filter_return_keys(result) def filter_return_keys(result)
result.slice(*SUCCESS_RETURN_KEYS) result.slice(*success_return_keys)
end end
def filter_params(params) def filter_params(params)
......
...@@ -10,6 +10,7 @@ module PodLogs ...@@ -10,6 +10,7 @@ module PodLogs
:check_container_name, :check_container_name,
:check_times, :check_times,
:check_search, :check_search,
:check_cursor,
:pod_logs, :pod_logs,
:filter_return_keys :filter_return_keys
...@@ -18,7 +19,11 @@ module PodLogs ...@@ -18,7 +19,11 @@ module PodLogs
private private
def valid_params def valid_params
%w(pod_name container_name search start end) super + %w(search start end cursor)
end
def success_return_keys
super + %i(cursor)
end end
def check_times(result) def check_times(result)
...@@ -36,19 +41,28 @@ module PodLogs ...@@ -36,19 +41,28 @@ module PodLogs
success(result) success(result)
end end
def check_cursor(result)
result[:cursor] = params['cursor'] if params.key?('cursor')
success(result)
end
def pod_logs(result) def pod_logs(result)
client = cluster&.application_elastic_stack&.elasticsearch_client client = cluster&.application_elastic_stack&.elasticsearch_client
return error(_('Unable to connect to Elasticsearch')) unless client return error(_('Unable to connect to Elasticsearch')) unless client
result[:logs] = ::Gitlab::Elasticsearch::Logs.new(client).pod_logs( response = ::Gitlab::Elasticsearch::Logs.new(client).pod_logs(
namespace, namespace,
result[:pod_name], result[:pod_name],
result[:container_name], container_name: result[:container_name],
result[:search], search: result[:search],
result[:start], start_time: result[:start],
result[:end] end_time: result[:end],
cursor: result[:cursor]
) )
result.merge!(response)
success(result) success(result)
rescue Elasticsearch::Transport::Transport::ServerError => e rescue Elasticsearch::Transport::Transport::ServerError => e
::Gitlab::ErrorTracking.track_exception(e) ::Gitlab::ErrorTracking.track_exception(e)
...@@ -58,6 +72,8 @@ module PodLogs ...@@ -58,6 +72,8 @@ module PodLogs
# there is no method on the exception other than the class name to determine the type of error encountered. # there is no method on the exception other than the class name to determine the type of error encountered.
status_code: e.class.name.split('::').last status_code: e.class.name.split('::').last
}) })
rescue ::Gitlab::Elasticsearch::Logs::InvalidCursor
error(_('Invalid cursor value provided'))
end end
end end
end end
...@@ -3,6 +3,8 @@ ...@@ -3,6 +3,8 @@
module Gitlab module Gitlab
module Elasticsearch module Elasticsearch
class Logs class Logs
InvalidCursor = Class.new(RuntimeError)
# How many log lines to fetch in a query # How many log lines to fetch in a query
LOGS_LIMIT = 500 LOGS_LIMIT = 500
...@@ -10,7 +12,7 @@ module Gitlab ...@@ -10,7 +12,7 @@ module Gitlab
@client = client @client = client
end end
def pod_logs(namespace, pod_name, container_name = nil, search = nil, start_time = nil, end_time = nil) def pod_logs(namespace, pod_name, container_name: nil, search: nil, start_time: nil, end_time: nil, cursor: nil)
query = { bool: { must: [] } }.tap do |q| query = { bool: { must: [] } }.tap do |q|
filter_pod_name(q, pod_name) filter_pod_name(q, pod_name)
filter_namespace(q, namespace) filter_namespace(q, namespace)
...@@ -19,7 +21,7 @@ module Gitlab ...@@ -19,7 +21,7 @@ module Gitlab
filter_times(q, start_time, end_time) filter_times(q, start_time, end_time)
end end
body = build_body(query) body = build_body(query, cursor)
response = @client.search body: body response = @client.search body: body
format_response(response) format_response(response)
...@@ -27,8 +29,8 @@ module Gitlab ...@@ -27,8 +29,8 @@ module Gitlab
private private
def build_body(query) def build_body(query, cursor = nil)
{ body = {
query: query, query: query,
# reverse order so we can query N-most recent records # reverse order so we can query N-most recent records
sort: [ sort: [
...@@ -40,6 +42,12 @@ module Gitlab ...@@ -40,6 +42,12 @@ module Gitlab
# fixed limit for now, we should support paginated queries # fixed limit for now, we should support paginated queries
size: ::Gitlab::Elasticsearch::Logs::LOGS_LIMIT size: ::Gitlab::Elasticsearch::Logs::LOGS_LIMIT
} }
unless cursor.nil?
body[:search_after] = decode_cursor(cursor)
end
body
end end
def filter_pod_name(query, pod_name) def filter_pod_name(query, pod_name)
...@@ -100,7 +108,9 @@ module Gitlab ...@@ -100,7 +108,9 @@ module Gitlab
end end
def format_response(response) def format_response(response)
result = response.fetch("hits", {}).fetch("hits", []).map do |hit| results = response.fetch("hits", {}).fetch("hits", [])
last_result = results.last
results = results.map do |hit|
{ {
timestamp: hit["_source"]["@timestamp"], timestamp: hit["_source"]["@timestamp"],
message: hit["_source"]["message"] message: hit["_source"]["message"]
...@@ -108,7 +118,32 @@ module Gitlab ...@@ -108,7 +118,32 @@ module Gitlab
end end
# we queried for the N-most recent records but we want them ordered oldest to newest # we queried for the N-most recent records but we want them ordered oldest to newest
result.reverse {
logs: results.reverse,
cursor: last_result.nil? ? nil : encode_cursor(last_result["sort"])
}
end
# we want to hide the implementation details of the search_after parameter from the frontend
# behind a single easily transmitted value
def encode_cursor(obj)
obj.join(',')
end
def decode_cursor(obj)
cursor = obj.split(',').map(&:to_i)
unless valid_cursor(cursor)
raise InvalidCursor, "invalid cursor format"
end
cursor
end
def valid_cursor(cursor)
cursor.instance_of?(Array) &&
cursor.length == 2 &&
cursor.map {|i| i.instance_of?(Integer)}.reduce(:&)
end end
end end
end end
......
...@@ -10892,6 +10892,9 @@ msgstr "" ...@@ -10892,6 +10892,9 @@ msgstr ""
msgid "Invalid URL" msgid "Invalid URL"
msgstr "" msgstr ""
msgid "Invalid cursor value provided"
msgstr ""
msgid "Invalid date" msgid "Invalid date"
msgstr "" msgstr ""
......
{
"query": {
"bool": {
"must": [
{
"match_phrase": {
"kubernetes.pod.name": {
"query": "production-6866bc8974-m4sk4"
}
}
},
{
"match_phrase": {
"kubernetes.namespace": {
"query": "autodevops-deploy-9-production"
}
}
}
]
}
},
"sort": [
{
"@timestamp": {
"order": "desc"
}
},
{
"offset": {
"order": "desc"
}
}
],
"search_after": [
9999934,
1572449784442
],
"_source": [
"@timestamp",
"message"
],
"size": 500
}
...@@ -20,6 +20,7 @@ describe Gitlab::Elasticsearch::Logs do ...@@ -20,6 +20,7 @@ describe Gitlab::Elasticsearch::Logs do
let(:search) { "foo +bar "} let(:search) { "foo +bar "}
let(:start_time) { "2019-12-13T14:35:34.034Z" } let(:start_time) { "2019-12-13T14:35:34.034Z" }
let(:end_time) { "2019-12-13T14:35:34.034Z" } let(:end_time) { "2019-12-13T14:35:34.034Z" }
let(:cursor) { "9999934,1572449784442" }
let(:body) { JSON.parse(fixture_file('lib/elasticsearch/query.json')) } let(:body) { JSON.parse(fixture_file('lib/elasticsearch/query.json')) }
let(:body_with_container) { JSON.parse(fixture_file('lib/elasticsearch/query_with_container.json')) } let(:body_with_container) { JSON.parse(fixture_file('lib/elasticsearch/query_with_container.json')) }
...@@ -27,6 +28,7 @@ describe Gitlab::Elasticsearch::Logs do ...@@ -27,6 +28,7 @@ describe Gitlab::Elasticsearch::Logs do
let(:body_with_times) { JSON.parse(fixture_file('lib/elasticsearch/query_with_times.json')) } let(:body_with_times) { JSON.parse(fixture_file('lib/elasticsearch/query_with_times.json')) }
let(:body_with_start_time) { JSON.parse(fixture_file('lib/elasticsearch/query_with_start_time.json')) } let(:body_with_start_time) { JSON.parse(fixture_file('lib/elasticsearch/query_with_start_time.json')) }
let(:body_with_end_time) { JSON.parse(fixture_file('lib/elasticsearch/query_with_end_time.json')) } let(:body_with_end_time) { JSON.parse(fixture_file('lib/elasticsearch/query_with_end_time.json')) }
let(:body_with_cursor) { JSON.parse(fixture_file('lib/elasticsearch/query_with_cursor.json')) }
RSpec::Matchers.define :a_hash_equal_to_json do |expected| RSpec::Matchers.define :a_hash_equal_to_json do |expected|
match do |actual| match do |actual|
...@@ -39,42 +41,49 @@ describe Gitlab::Elasticsearch::Logs do ...@@ -39,42 +41,49 @@ describe Gitlab::Elasticsearch::Logs do
expect(client).to receive(:search).with(body: a_hash_equal_to_json(body)).and_return(es_response) expect(client).to receive(:search).with(body: a_hash_equal_to_json(body)).and_return(es_response)
result = subject.pod_logs(namespace, pod_name) result = subject.pod_logs(namespace, pod_name)
expect(result).to eq([es_message_4, es_message_3, es_message_2, es_message_1]) expect(result).to eq(logs: [es_message_4, es_message_3, es_message_2, es_message_1], cursor: cursor)
end end
it 'can further filter the logs by container name' do it 'can further filter the logs by container name' do
expect(client).to receive(:search).with(body: a_hash_equal_to_json(body_with_container)).and_return(es_response) expect(client).to receive(:search).with(body: a_hash_equal_to_json(body_with_container)).and_return(es_response)
result = subject.pod_logs(namespace, pod_name, container_name) result = subject.pod_logs(namespace, pod_name, container_name: container_name)
expect(result).to eq([es_message_4, es_message_3, es_message_2, es_message_1]) expect(result).to eq(logs: [es_message_4, es_message_3, es_message_2, es_message_1], cursor: cursor)
end end
it 'can further filter the logs by search' do it 'can further filter the logs by search' do
expect(client).to receive(:search).with(body: a_hash_equal_to_json(body_with_search)).and_return(es_response) expect(client).to receive(:search).with(body: a_hash_equal_to_json(body_with_search)).and_return(es_response)
result = subject.pod_logs(namespace, pod_name, nil, search) result = subject.pod_logs(namespace, pod_name, search: search)
expect(result).to eq([es_message_4, es_message_3, es_message_2, es_message_1]) expect(result).to eq(logs: [es_message_4, es_message_3, es_message_2, es_message_1], cursor: cursor)
end end
it 'can further filter the logs by start_time and end_time' do it 'can further filter the logs by start_time and end_time' do
expect(client).to receive(:search).with(body: a_hash_equal_to_json(body_with_times)).and_return(es_response) expect(client).to receive(:search).with(body: a_hash_equal_to_json(body_with_times)).and_return(es_response)
result = subject.pod_logs(namespace, pod_name, nil, nil, start_time, end_time) result = subject.pod_logs(namespace, pod_name, start_time: start_time, end_time: end_time)
expect(result).to eq([es_message_4, es_message_3, es_message_2, es_message_1]) expect(result).to eq(logs: [es_message_4, es_message_3, es_message_2, es_message_1], cursor: cursor)
end end
it 'can further filter the logs by only start_time' do it 'can further filter the logs by only start_time' do
expect(client).to receive(:search).with(body: a_hash_equal_to_json(body_with_start_time)).and_return(es_response) expect(client).to receive(:search).with(body: a_hash_equal_to_json(body_with_start_time)).and_return(es_response)
result = subject.pod_logs(namespace, pod_name, nil, nil, start_time) result = subject.pod_logs(namespace, pod_name, start_time: start_time)
expect(result).to eq([es_message_4, es_message_3, es_message_2, es_message_1]) expect(result).to eq(logs: [es_message_4, es_message_3, es_message_2, es_message_1], cursor: cursor)
end end
it 'can further filter the logs by only end_time' do it 'can further filter the logs by only end_time' do
expect(client).to receive(:search).with(body: a_hash_equal_to_json(body_with_end_time)).and_return(es_response) expect(client).to receive(:search).with(body: a_hash_equal_to_json(body_with_end_time)).and_return(es_response)
result = subject.pod_logs(namespace, pod_name, nil, nil, nil, end_time) result = subject.pod_logs(namespace, pod_name, end_time: end_time)
expect(result).to eq([es_message_4, es_message_3, es_message_2, es_message_1]) expect(result).to eq(logs: [es_message_4, es_message_3, es_message_2, es_message_1], cursor: cursor)
end
it 'can search after a cursor' do
expect(client).to receive(:search).with(body: a_hash_equal_to_json(body_with_cursor)).and_return(es_response)
result = subject.pod_logs(namespace, pod_name, cursor: cursor)
expect(result).to eq(logs: [es_message_4, es_message_3, es_message_2, es_message_1], cursor: cursor)
end end
end end
end end
...@@ -11,6 +11,7 @@ describe ::PodLogs::ElasticsearchService do ...@@ -11,6 +11,7 @@ describe ::PodLogs::ElasticsearchService do
let(:search) { 'foo -bar' } let(:search) { 'foo -bar' }
let(:start_time) { '2019-01-02T12:13:14+02:00' } let(:start_time) { '2019-01-02T12:13:14+02:00' }
let(:end_time) { '2019-01-03T12:13:14+02:00' } let(:end_time) { '2019-01-03T12:13:14+02:00' }
let(:cursor) { '9999934,1572449784442' }
let(:params) { {} } let(:params) { {} }
let(:expected_logs) do let(:expected_logs) do
[ [
...@@ -116,6 +117,36 @@ describe ::PodLogs::ElasticsearchService do ...@@ -116,6 +117,36 @@ describe ::PodLogs::ElasticsearchService do
end end
end end
describe '#check_cursor' do
context 'with cursor provided and valid' do
let(:params) do
{
'cursor' => cursor
}
end
it 'returns success with cursor' do
result = subject.send(:check_cursor, {})
expect(result[:status]).to eq(:success)
expect(result[:cursor]).to eq(cursor)
end
end
context 'with cursor not provided' do
let(:params) do
{}
end
it 'returns success with nothing else' do
result = subject.send(:check_cursor, {})
expect(result.keys.length).to eq(1)
expect(result[:status]).to eq(:success)
end
end
end
describe '#pod_logs' do describe '#pod_logs' do
let(:result_arg) do let(:result_arg) do
{ {
...@@ -123,9 +154,11 @@ describe ::PodLogs::ElasticsearchService do ...@@ -123,9 +154,11 @@ describe ::PodLogs::ElasticsearchService do
container_name: container_name, container_name: container_name,
search: search, search: search,
start: start_time, start: start_time,
end: end_time end: end_time,
cursor: cursor
} }
end end
let(:expected_cursor) { '9999934,1572449784442' }
before do before do
create(:clusters_applications_elastic_stack, :installed, cluster: cluster) create(:clusters_applications_elastic_stack, :installed, cluster: cluster)
...@@ -137,13 +170,14 @@ describe ::PodLogs::ElasticsearchService do ...@@ -137,13 +170,14 @@ describe ::PodLogs::ElasticsearchService do
.and_return(Elasticsearch::Transport::Client.new) .and_return(Elasticsearch::Transport::Client.new)
allow_any_instance_of(::Gitlab::Elasticsearch::Logs) allow_any_instance_of(::Gitlab::Elasticsearch::Logs)
.to receive(:pod_logs) .to receive(:pod_logs)
.with(namespace, pod_name, container_name, search, start_time, end_time) .with(namespace, pod_name, container_name: container_name, search: search, start_time: start_time, end_time: end_time, cursor: cursor)
.and_return(expected_logs) .and_return({ logs: expected_logs, cursor: expected_cursor })
result = subject.send(:pod_logs, result_arg) result = subject.send(:pod_logs, result_arg)
expect(result[:status]).to eq(:success) expect(result[:status]).to eq(:success)
expect(result[:logs]).to eq(expected_logs) expect(result[:logs]).to eq(expected_logs)
expect(result[:cursor]).to eq(expected_cursor)
end end
it 'returns an error when ES is unreachable' do it 'returns an error when ES is unreachable' do
...@@ -170,5 +204,19 @@ describe ::PodLogs::ElasticsearchService do ...@@ -170,5 +204,19 @@ describe ::PodLogs::ElasticsearchService do
expect(result[:status]).to eq(:error) expect(result[:status]).to eq(:error)
expect(result[:message]).to eq('Elasticsearch returned status code: ServiceUnavailable') expect(result[:message]).to eq('Elasticsearch returned status code: ServiceUnavailable')
end end
it 'handles cursor errors from elasticsearch' do
allow_any_instance_of(::Clusters::Applications::ElasticStack)
.to receive(:elasticsearch_client)
.and_return(Elasticsearch::Transport::Client.new)
allow_any_instance_of(::Gitlab::Elasticsearch::Logs)
.to receive(:pod_logs)
.and_raise(::Gitlab::Elasticsearch::Logs::InvalidCursor.new)
result = subject.send(:pod_logs, result_arg)
expect(result[:status]).to eq(:error)
expect(result[:message]).to eq('Invalid cursor value provided')
end
end end
end end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment