Commit b67486fe authored by Mayra Cabrera's avatar Mayra Cabrera

Merge branch '207280-batch-load-db-bulk-indexer-records' into 'master'

Preload DB records in bulk ES indexing

See merge request gitlab-org/gitlab!26754
parents c81b69bf 86846aed
...@@ -67,7 +67,8 @@ module Elastic ...@@ -67,7 +67,8 @@ module Elastic
last_score: last_score last_score: last_score
) )
specs.each { |spec, _| submit_document(spec) } refs = deserialize_all(specs)
refs.preload_database_records.each { |ref| submit_document(ref) }
failures = bulk_indexer.flush failures = bulk_indexer.flush
# Re-enqueue any failures so they are retried # Re-enqueue any failures so they are retried
...@@ -85,10 +86,10 @@ module Elastic ...@@ -85,10 +86,10 @@ module Elastic
) )
end end
def submit_document(spec) def deserialize_all(specs)
ref = ::Gitlab::Elastic::DocumentReference.deserialize(spec) refs = ::Gitlab::Elastic::DocumentReference::Collection.new
specs.each do |spec, _|
bulk_indexer.process(ref) refs.deserialize_and_add(spec)
rescue ::Gitlab::Elastic::DocumentReference::InvalidError => err rescue ::Gitlab::Elastic::DocumentReference::InvalidError => err
logger.warn( logger.warn(
message: 'submit_document_failed', message: 'submit_document_failed',
...@@ -98,6 +99,13 @@ module Elastic ...@@ -98,6 +99,13 @@ module Elastic
) )
end end
refs
end
def submit_document(ref)
bulk_indexer.process(ref)
end
def bulk_indexer def bulk_indexer
@bulk_indexer ||= ::Gitlab::Elastic::BulkIndexer.new(logger: logger) @bulk_indexer ||= ::Gitlab::Elastic::BulkIndexer.new(logger: logger)
end end
......
---
title: Preload database records in bulk indexing
merge_request: 26754
author:
type: performance
...@@ -12,6 +12,36 @@ module Gitlab ...@@ -12,6 +12,36 @@ module Gitlab
InvalidError = Class.new(StandardError) InvalidError = Class.new(StandardError)
class Collection
include Enumerable
def initialize
@refs = []
end
def deserialize_and_add(string)
@refs << ::Gitlab::Elastic::DocumentReference.deserialize(string)
end
def each(&blk)
@refs.each(&blk)
end
def preload_database_records
@refs.group_by(&:klass).each do |klass, group|
ids = group.map(&:db_id)
records = klass.id_in(ids)
records_by_id = records.each_with_object({}) { |record, hash| hash[record.id] = record }
group.each do |ref|
ref.database_record = records_by_id[ref.db_id.to_i]
end
end
self
end
end
class << self class << self
def build(instance) def build(instance)
new(instance.class, instance.id, instance.es_id, instance.es_parent) new(instance.class, instance.id, instance.es_id, instance.es_parent)
...@@ -87,6 +117,10 @@ module Gitlab ...@@ -87,6 +117,10 @@ module Gitlab
strong_memoize(:database_record) { klass.find_by_id(db_id) } strong_memoize(:database_record) { klass.find_by_id(db_id) }
end end
def database_record=(record)
strong_memoize(:database_record) { record }
end
def serialize def serialize
self.class.serialize_array([klass_name, db_id, es_id, es_parent].compact) self.class.serialize_array([klass_name, db_id, es_id, es_parent].compact)
end end
......
...@@ -167,4 +167,58 @@ describe Gitlab::Elastic::DocumentReference do ...@@ -167,4 +167,58 @@ describe Gitlab::Elastic::DocumentReference do
expect(project_as_ref.serialize).to eq(project_as_str) expect(project_as_ref.serialize).to eq(project_as_str)
end end
end end
describe '::Collection' do
it 'contains a collection of DocumentReference' do
ref1 = described_class.new(Integer, 1, 'integer_1')
ref2 = described_class.new(Integer, 1, 'integer_1')
ref3 = described_class.new(Integer, 1, 'integer_1')
collection = described_class::Collection.new
collection.deserialize_and_add(ref1.serialize)
collection.deserialize_and_add(ref2.serialize)
collection.deserialize_and_add(ref3.serialize)
expect(collection.count).to eq(3)
expect(collection.first).to eq(ref1)
end
describe '#preload_database_records' do
let(:issue1) { create(:issue) }
let(:issue2) { create(:issue) }
let(:note1) { create(:note) }
let(:note2) { create(:note) }
let(:note_deleted) do
note = create(:note)
note.delete
note
end
let(:issue_ref1) { described_class.new(Issue, issue1.id, issue1.es_id, issue1.es_parent) }
let(:issue_ref2) { described_class.new(Issue, issue2.id, issue2.es_id, issue2.es_parent) }
let(:note_ref1) { described_class.new(Note, note1.id, note1.es_id, note1.es_parent) }
let(:note_ref2) { described_class.new(Note, note2.id, note2.es_id, note2.es_parent) }
let(:note_ref_deleted) { described_class.new(Note, note_deleted.id, note_deleted.es_id, note_deleted.es_parent) }
it 'preloads database records in one query per type' do
collection = described_class::Collection.new
collection.deserialize_and_add(issue_ref1.serialize)
collection.deserialize_and_add(issue_ref2.serialize)
collection.deserialize_and_add(note_ref1.serialize)
collection.deserialize_and_add(note_ref2.serialize)
collection.deserialize_and_add(note_ref_deleted.serialize)
database_records = nil
expect do
database_records = collection.preload_database_records.map { |ref| ref.database_record }
end.not_to exceed_query_limit(2)
expect(database_records[0]).to eq(issue1)
expect(database_records[1]).to eq(issue2)
expect(database_records[2]).to eq(note1)
expect(database_records[3]).to eq(note2)
expect(database_records[4]).to eq(nil) # Deleted database record will be nil
end
end
end
end end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment