Commit 2ff70c48 authored by Douglas Barbosa Alexandre's avatar Douglas Barbosa Alexandre

Merge branch...

Merge branch '224133-elasticindexbulkcronworker-error-don-t-know-how-to-serialize-nilclass' into 'master'

Resolve BulkIndexer flush returning nil values

See merge request gitlab-org/gitlab!36716
parents 3288a0f5 8ae4857e
---
title: Fix BulkIndexer flush returning nil values after failure.
merge_request: 36716
author:
type: fixed
......@@ -17,19 +17,24 @@ module Gitlab
attr_reader :logger, :failures
# body - array of json formatted index operation requests awaiting submission to elasticsearch in bulk
# body_size_bytes - total size in bytes of each json element in body array
# failures - array of records that had a failure during submission to elasticsearch
# logger - set the logger used by instance
# ref_buffer - records awaiting submission to elasticsearch
# cleared if `try_send_bulk` is successful
# flushed into `failures` if `try_send_bulk` fails
def initialize(logger:)
@body = []
@body_size_bytes = 0
@failures = []
@logger = logger
@ref_cache = []
@ref_buffer = []
end
# Adds or removes a document in elasticsearch, depending on whether the
# database record it refers to can be found
def process(ref)
ref_cache << ref
if ref.database_record
index(ref)
else
......@@ -48,23 +53,23 @@ module Gitlab
def reset!
@body = []
@body_size_bytes = 0
@ref_cache = []
@ref_buffer = []
end
attr_reader :body, :body_size_bytes, :ref_cache
attr_reader :body, :body_size_bytes, :ref_buffer
def index(ref)
proxy = ref.database_record.__elasticsearch__
op = build_op(ref, proxy)
submit({ index: op }, proxy.as_indexed_json)
submit(ref, { index: op }, proxy.as_indexed_json)
end
def delete(ref)
proxy = ref.klass.__elasticsearch__
op = build_op(ref, proxy)
submit(delete: op)
submit(ref, delete: op)
end
def build_op(ref, proxy)
......@@ -83,12 +88,17 @@ module Gitlab
Gitlab::CurrentSettings.elasticsearch_max_bulk_size_mb.megabytes
end
def submit(*hashes)
def submit(ref, *hashes)
jsons = hashes.map(&:to_json)
bytesize = calculate_bytesize(jsons)
# if new ref will exceed the bulk limit, send existing buffer of records
# when successful, clears `body`, `ref_buffer`, and `body_size_bytes`
# continue to buffer refs until bulk limit is reached or flush is called
# any errors encountered are added to `failures`
send_bulk if will_exceed_bulk_limit?(bytesize)
ref_buffer << ref
body.concat(jsons)
@body_size_bytes += bytesize
end
......@@ -111,7 +121,7 @@ module Gitlab
logger.info(
message: 'bulk_submitted',
body_size_bytes: body_size_bytes,
bulk_count: ref_cache.count,
bulk_count: ref_buffer.count,
errors_count: failed_refs.count
)
......@@ -128,7 +138,7 @@ module Gitlab
# If an exception is raised, treat the entire bulk as failed
logger.error(message: 'bulk_exception', error_class: err.class.to_s, error_message: err.message)
ref_cache
ref_buffer
end
def process_errors(result)
......@@ -145,7 +155,7 @@ module Gitlab
if op.nil? || op['error']
logger.warn(message: 'bulk_error', item: item)
out << ref_cache[i]
out << ref_buffer[i]
end
end
......
......@@ -56,6 +56,7 @@ RSpec.describe Gitlab::Elastic::BulkIndexer, :elastic do
body_bytesize = args[:body].map(&:bytesize).reduce(:+)
expect(body_bytesize).to be <= bulk_limit_bytes
end
expect(indexer.failures).to be_empty
end
end
......@@ -99,6 +100,25 @@ RSpec.describe Gitlab::Elastic::BulkIndexer, :elastic do
expect(indexer.failures).to contain_exactly(issue_as_ref, other_issue_as_ref)
end
it 'fails a document correctly on exception after adding an item that exceeded the bulk limit' do
bulk_limit_bytes = (issue_as_json_with_times.to_json.bytesize * 1.5).to_i
set_bulk_limit(indexer, bulk_limit_bytes)
indexer.process(issue_as_ref)
allow(es_client).to receive(:bulk).and_return({})
indexer.process(issue_as_ref)
expect(es_client).to have_received(:bulk) do |args|
body_bytesize = args[:body].map(&:bytesize).reduce(:+)
expect(body_bytesize).to be <= bulk_limit_bytes
end
expect(es_client).to receive(:bulk) { raise 'An exception' }
expect(indexer.flush).to contain_exactly(issue_as_ref)
expect(indexer.failures).to contain_exactly(issue_as_ref)
end
context 'indexing an issue' do
it 'adds the issue to the index' do
expect(indexer.process(issue_as_ref).flush).to be_empty
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment